Blob Blame History Raw
diff --git a/src/plugins/apache-httpd/src/client/htext.h b/src/plugins/apache-httpd/src/client/htext.h
index 1303e77e..853a146a 100644
--- a/src/plugins/apache-httpd/src/client/htext.h
+++ b/src/plugins/apache-httpd/src/client/htext.h
@@ -67,6 +67,8 @@ typedef enum
     HTEXTOP_NOHEAD, /* Disable HEAD requests */
 
     HTEXTOP_USE_COPY_FROM_SOURCE, /* Use Copy Source (fetching) instead of Copy Destination (pushing) */
+    HTEXTOP_COPY_DONE, /* If set, called after COPY operation is done */
+    HTEXTOP_COPY_DONE_DATA, /* Additional data to pass to transfer done function */
 
     HTEXTOP_SENTINEL, /* To mark the last one */
 
diff --git a/src/plugins/apache-httpd/src/client/htext_api.c b/src/plugins/apache-httpd/src/client/htext_api.c
index 1c3df75c..201a5e37 100644
--- a/src/plugins/apache-httpd/src/client/htext_api.c
+++ b/src/plugins/apache-httpd/src/client/htext_api.c
@@ -92,6 +92,8 @@ static option_entry option_definitions[] = {
 
     { OT_INT, (option_value) 0 }, /* HTEXTOP_NOHEAD         */
     { OT_INT, (option_value) 0 }, /* HTEXTOP_USE_COPY_FROM_SOURCE */
+    { OT_POINTER, (option_value) NULL }, /* HTEXTOP_COPY_DONE */
+    { OT_POINTER, (option_value) NULL }, /* HTEXTOP_COPY_DONE_DATA */
 };
 
 /**
@@ -156,6 +158,10 @@ void htext_destroy(htext_handle *handle)
     free(handle->error_string);
     free(handle->partial_total);
     free(handle->partial_done);
+    for (i = 0; i < handle->partials; ++i) {
+        free(handle->partial_rconn[i]);
+    }
+    free(handle->partial_rconn);
 
     for (i = 0; i < HTEXTOP_SENTINEL; ++i) {
         if (handle->options[i].type == OT_STRING)
@@ -249,6 +255,7 @@ int htext_perform(htext_handle *handle)
 {
     void* (*performer)(void *);
     const char *performer_str;
+    int i;
 
     /* Check status */
     switch (handle->status) {
@@ -263,9 +270,14 @@ int htext_perform(htext_handle *handle)
     free(handle->error_string);
     free(handle->partial_total);
     free(handle->partial_done);
+    for (i = 0; i < handle->partials; ++i) {
+        free(handle->partial_rconn[i]);
+    }
+    free(handle->partial_rconn);
     handle->error_string = NULL;
     handle->partial_done = NULL;
     handle->partial_total = NULL;
+    handle->partial_rconn = NULL;
     handle->partials = 0;
 
     /* Check we have source and destination */
@@ -326,7 +338,9 @@ int htext_perform(htext_handle *handle)
 
 void htext_abort(htext_handle *handle)
 {
-    /* TODO: Really abort */
+    /* TODO: Transfer is aborted by returning non-zero value from progress
+       function (this doesn't happen immediately and with curl easy interface
+       there is no better option except closing directly transfer socket) */
     handle->status = HTEXTS_ABORTED;
 }
 
diff --git a/src/plugins/apache-httpd/src/client/htext_common.c b/src/plugins/apache-httpd/src/client/htext_common.c
index 34a28353..219988dc 100644
--- a/src/plugins/apache-httpd/src/client/htext_common.c
+++ b/src/plugins/apache-httpd/src/client/htext_common.c
@@ -40,8 +40,6 @@ void htext_partial_clean(htext_chunk *p)
         curl_easy_cleanup(p->curl);
     if (p->fd)
         GETIO(p->handle) ->close(p->fd);
-    if (p->chunk_rconn && *(p->chunk_rconn))
-        free(*(p->chunk_rconn));
     if (p->error_string)
         free(p->error_string);
     if (p->http_response)
@@ -219,28 +217,37 @@ int htext_progress_callback(void *pp, curl_off_t dltotal, curl_off_t dlnow,
         *(partial->chunk_done) = dlnow;
     }
 
-    CURLcode res = CURLE_OK;
-    char *ip = NULL;
-    long port = 0;
-
-    if (res == CURLE_OK)
-        res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_IP, &ip);
-    if (res == CURLE_OK)
-        res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_PORT, &port);
-    if (res == CURLE_OK && ip) {
-        int len = strlen(ip)+15;
-        char *rconn = malloc(len);
-        if (strchr(ip, ':')) {
-            snprintf(rconn, len, "tcp:[%s]:%ld", ip, port);
-        } else {
-            snprintf(rconn, len, "tcp:%s:%ld", ip, port);
+    /* Add remote connection only after real data transfer starts
+       otherwise ip:port info could come previous (headnode) connection */
+    if (dltotal > 0 || dlnow > 0 || ultotal > 0 || ulnow > 0) {
+        CURLcode res = CURLE_OK;
+        char *ip = NULL;
+        long port = 0;
+
+	if (res == CURLE_OK)
+            res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_IP, &ip);
+        if (res == CURLE_OK)
+            res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_PORT, &port);
+
+	if (res == CURLE_OK && ip && port) {
+            char rconn[HTEXT_PERF_MARKER_MAX_RCONN_SIZE];
+            if (strchr(ip, ':')) {
+                snprintf(rconn, sizeof(rconn), "tcp:[%s]:%ld", ip, port);
+            } else {
+                snprintf(rconn, sizeof(rconn), "tcp:%s:%ld", ip, port);
+            }
+            htext_log(partial->handle, "connection %s, download %ld/%ld, upload %ld/%ld", rconn, dlnow, dltotal, ulnow, ultotal);
+            if (!*(partial->chunk_rconn))
+                *(partial->chunk_rconn) = calloc(sizeof(char), HTEXT_PERF_MARKER_MAX_RCONN_SIZE);
+            if (strcmp(*(partial->chunk_rconn), rconn))
+                strcpy(*(partial->chunk_rconn), rconn);
         }
-        htext_log(partial->handle, "connection %s, download %ld/%ld, upload %ld/%ld", rconn, dlnow, dltotal, ulnow, ultotal);
-        char *tmp = *(partial->chunk_rconn);
-        *(partial->chunk_rconn) = rconn;
-        if (tmp) free(tmp);
     }
 
+    /* Let curl abort terminated transfer (doesn't happen immediately) */
+    if (partial->handle->status == HTEXTS_ABORTED)
+        return 1;
+
     return 0;
 }
 
diff --git a/src/plugins/apache-httpd/src/client/htext_copy.c b/src/plugins/apache-httpd/src/client/htext_copy.c
index 88c4aacd..0be2f969 100644
--- a/src/plugins/apache-httpd/src/client/htext_copy.c
+++ b/src/plugins/apache-httpd/src/client/htext_copy.c
@@ -83,6 +83,7 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb,
             memset(perf_data, 0, sizeof(*perf_data));
             perf_data->index = -1;
             perf_data->transferred = -1;
+            perf_data->rconn[0] = '\0';
         }
         else if (strncasecmp("Timestamp:", p, 10) == 0)
         {
@@ -94,15 +95,17 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb,
         }
         else if (strncasecmp("Stripe Bytes Transferred:", p, 25) == 0)
         {
-            perf_data->transferred = atol(p + 26);
+            perf_data->transferred = atol(p + 25);
         }
         else if (strncasecmp("Total Stripe Count:", p, 19) == 0)
         {
-            perf_data->count = atoi(p + 20);
+            perf_data->count = atoi(p + 19);
         }
         else if (strncasecmp("RemoteConnections:", p, 18) == 0)
         {
-            strcpy(perf_data->rconn, p + 19);
+            char *c = p+18;
+            while (isspace(*c)) c++;
+            snprintf(perf_data->rconn, sizeof(perf_data->rconn), "%s", c);
         }
         // skip unused dCache perf markers
         else if (strncasecmp("State:", p, 6) == 0) {}
@@ -137,8 +140,12 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb,
 
                 handle->partial_done[perf_data->index] = perf_data->transferred;
                 handle->partial_total[perf_data->index] = 0;
-                handle->partial_rconn[perf_data->index] = perf_data->rconn[0] != '\0' ? strdup(perf_data->rconn) : NULL;
-
+                if (perf_data->rconn[0] != '\0') {
+                    if (!handle->partial_rconn[perf_data->index])
+                        handle->partial_rconn[perf_data->index] = calloc(sizeof(char), HTEXT_PERF_MARKER_MAX_RCONN_SIZE);
+                    if (strcmp(handle->partial_rconn[perf_data->index], perf_data->rconn))
+                        strcpy(handle->partial_rconn[perf_data->index], perf_data->rconn);
+                }
             }
             
         }
@@ -257,8 +264,15 @@ void *htext_copy_method(void *h)
 
     handle->http_status = control.http_status;
 
+    /* Call transfer done callback */
+    if (GETPTR(handle, HTEXTOP_COPY_DONE)) {
+        void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE);
+        copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA));
+    }
+
     /* Clean up */
     htext_partial_clean(&control);
     curl_easy_cleanup(curl);
+
     return NULL ;
 }
diff --git a/src/plugins/apache-httpd/src/client/htext_get.c b/src/plugins/apache-httpd/src/client/htext_get.c
index 0fb49bde..ae9b686b 100644
--- a/src/plugins/apache-httpd/src/client/htext_get.c
+++ b/src/plugins/apache-httpd/src/client/htext_get.c
@@ -276,6 +276,13 @@ void *htext_get_method(void *h)
         handle->status = HTEXTS_SUCCEEDED;
     }
 
+    /* Call transfer done callback */
+    if (GETPTR(handle, HTEXTOP_COPY_DONE)) {
+        void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE);
+        copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA));
+    }
+
+    /* Clean up */
     curl_easy_cleanup(curl);
     free(partial_array);
 
diff --git a/src/plugins/apache-httpd/src/client/htext_private.h b/src/plugins/apache-httpd/src/client/htext_private.h
index 0a406df7..b4369c1e 100644
--- a/src/plugins/apache-httpd/src/client/htext_private.h
+++ b/src/plugins/apache-httpd/src/client/htext_private.h
@@ -113,13 +113,14 @@ typedef struct htext_chunk htext_chunk;
  * and unprocessed last line from input data
  */
 #define HTEXT_PERF_MARKER_MAX_LINE_SIZE 1024
+#define HTEXT_PERF_MARKER_MAX_RCONN_SIZE 256
 struct htext_perf_marker_parsed_data
 {
     time_t latest; /* Timestamp for received data */
     int index; /* Stripe index */
     off_t transferred; /* Stripe bytes transferred */
     int count; /* Total stripe count */
-    char rconn[HTEXT_PERF_MARKER_MAX_LINE_SIZE]; /* Remote connections */
+    char rconn[HTEXT_PERF_MARKER_MAX_RCONN_SIZE]; /* Remote connections */
 
     char remaining[HTEXT_PERF_MARKER_MAX_LINE_SIZE]; /* Remaining input data from unterminated last line */
 };
diff --git a/src/plugins/apache-httpd/src/client/htext_put.c b/src/plugins/apache-httpd/src/client/htext_put.c
index 0c3e4c1d..94d8781a 100644
--- a/src/plugins/apache-httpd/src/client/htext_put.c
+++ b/src/plugins/apache-httpd/src/client/htext_put.c
@@ -377,6 +377,12 @@ void *htext_put_method(void *h)
         handle->status = HTEXTS_SUCCEEDED;
     }
 
+    /* Call transfer done callback */
+    if (GETPTR(handle, HTEXTOP_COPY_DONE)) {
+        void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE);
+        copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA));
+    }
+
     curl_easy_cleanup(curl);
     free(partial_array);
 
diff --git a/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c b/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c
index dd586da6..cdc22c30 100644
--- a/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c
+++ b/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c
@@ -42,6 +42,8 @@ typedef struct
     request_rec *request;
     const char *source;
     const char *destination;
+    pthread_mutex_t *done_lock;
+    pthread_cond_t *done_cond;
 } dav_disk_copy_data;
 
 /* Custom I/O handler */
@@ -191,6 +193,30 @@ static void dav_disk_copy_log(htext_handle *handle, HTEXT_LOG_TYPE type,
     }
 }
 
+/**
+ * Called by htext when transfer is done
+ * @param handle The handle that triggers the call
+ * @param status The size of the message (it might not be NULL terminated)
+ * @param ud     User defined data
+ */
+static void dav_disk_copy_done(htext_handle *handle, int status, void *ud)
+{
+    dav_disk_copy_data *ddc = (dav_disk_copy_data*) ud;
+
+    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request,
+            "Remote copy done (%d): %s => %s status %i",
+            htext_http_code(handle), ddc->source, ddc->destination, status);
+
+    // send signal to finish perf marker loop
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+    pthread_mutex_lock(ddc->done_lock);
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+    pthread_cond_signal(ddc->done_cond);
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+    pthread_mutex_unlock(ddc->done_lock);
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+}
+
 /**
  * Generates and sends the performance clients following GridFTP format
  * http://www.ogf.org/documents/GFD.20.pdf (Appendix II), skipping the
@@ -201,14 +223,14 @@ static void dav_disk_send_performance_markers(dav_disk_copy_data *ddc, size_t n,
 {
     (void) total;
 
-    char buf[80];
+    char buf[256+2]; // HTEXT_PERF_MARKER_MAX_RCONN_SIZE+2
     time_t timestamp = time(NULL );
     size_t i;
 
     for (i = 0; i < n; ++i) {
         buf[0] = '\0';
         if (rconn && rconn[i]) {
-            snprintf(buf, 78, "\tRemoteConnections: %s", rconn[i]);
+            snprintf(buf, sizeof(buf)-2, "\tRemoteConnections: %s", rconn[i]);
             strcat(buf, "\n"); // we reserved space for endl
         }
         apr_brigade_printf(ddc->brigade, ap_filter_flush, ddc->output,
@@ -228,52 +250,81 @@ static void dav_disk_send_performance_markers(dav_disk_copy_data *ddc, size_t n,
 static dav_error *dav_disk_pool_and_feedback(htext_handle *handle,
         dav_disk_copy_data *ddc)
 {
-    int wait, status;
+    int status, interval = 1, done = 0;
     dav_error *error = NULL;
     const char *error_string;
 
-    do {
-        size_t *total, *done, n, i;
-        size_t globalDone, globalTotal;
-        char **rconn;
+    pthread_mutex_lock(ddc->done_lock); 
 
+    do {
         status = htext_status(handle);
-        switch (status) {
-            case HTEXTS_SUCCEEDED:
-            case HTEXTS_FAILED:
-            case HTEXTS_ABORTED:
-                wait = 0;
-                break;
-            default:
-                /* In the first go we need to set the reply */
-                if (ddc->request->status == 0) {
-                    ddc->request->status = HTTP_ACCEPTED;
-                    ap_set_content_type(ddc->request, "text/plain");
-                }
-                /* Print progress */
-                htext_progress(handle, &n, &total, &done, &rconn);
-                globalDone = globalTotal = 0;
-                for (i = 0; i < n; ++i) {
-                    globalDone += done[i];
-                    globalTotal += total[i];
-                }
-
-                dav_disk_send_performance_markers(ddc, n, total, done, rconn);
-
-                if (ap_fflush(ddc->output, ddc->brigade) == APR_SUCCESS) {
-                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request,
-                            "COPY '%s' %lu/%lu", ddc->request->uri,
-                            (unsigned long) globalDone,
-                            (unsigned long) globalTotal);
-                    wait = 1;
-                    sleep(1);
-                }
-                else {
-                    wait = 0;
-                    htext_abort(handle);
-                }
+
+        if (status == HTEXTS_SUCCEEDED || status == HTEXTS_FAILED || status == HTEXTS_ABORTED) {
+            done = 1;
+        }
+        else {
+            struct timespec ts;
+            clock_gettime(CLOCK_REALTIME, &ts);
+            ts.tv_sec += interval;
+            interval = 5; // 5s between perf markers except for first one
+
+            int rc = pthread_cond_timedwait(ddc->done_cond, ddc->done_lock, &ts);
+            if (rc == 0) {
+                // transfer done, continue to get new status
+                continue;
+            }
+            else if (rc == ETIMEDOUT) {
+                // ongoing transfer, reached time to print new perfmarker
+            }
+            else {
+                ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request,
+                        "Waiting for perf marker condition failed for COPY '%s' rc %i",
+                        ddc->request->uri, rc);
+                htext_abort(handle);
+                continue;
+            }
+        }
+
+        /* Always write last performance marker and in defined intervals */
+        {
+            size_t *total, *done, n, i;
+            size_t globalDone, globalTotal;
+            char **rconn;
+
+            /* In the first go we need to set the reply */
+            if (ddc->request->status == 0) {
+                ddc->request->status = HTTP_ACCEPTED;
+                ap_set_content_type(ddc->request, "text/plain");
+            }
+            /* Print progress */
+            htext_progress(handle, &n, &total, &done, &rconn);
+            dav_disk_send_performance_markers(ddc, n, total, done, rconn);
+
+            globalDone = globalTotal = 0;
+            for (i = 0; i < n; ++i) {
+                globalDone += done[i];
+                globalTotal += total[i];
+            }
+
+            if (ap_fflush(ddc->output, ddc->brigade) == APR_SUCCESS) {
+                ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request,
+                        "COPY '%s' %lu/%lu", ddc->request->uri,
+                        (unsigned long) globalDone,
+                        (unsigned long) globalTotal);
+            }
+            else {
+                ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request,
+                        "Failed to send perf marker for COPY '%s' %lu/%lu",
+                        ddc->request->uri,
+                        (unsigned long) globalDone,
+                        (unsigned long) globalTotal);
+                htext_abort(handle);
+            }
         }
-    } while (wait);
+
+    } while (!done);
+
+    pthread_mutex_unlock(ddc->done_lock);
 
     error_string = htext_error_string(handle);
     switch (status) {
@@ -439,6 +490,8 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
     dav_disk_copy_data ddc;
     request_rec* req = res->info->request;
     dav_disk_dir_conf* d_conf = res->info->d_conf;
+    pthread_mutex_t done_lock;
+    pthread_cond_t done_cond;
 
     int oldcancel;
     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldcancel);
@@ -475,6 +528,9 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
     htext_setopt(handle, HTEXTOP_LOW_SPEED_TIME, d_conf->low_speed_time);
     htext_setopt(handle, HTEXTOP_LOW_SPEED_LIMIT, d_conf->low_speed_limit);
 
+    htext_setopt(handle, HTEXTOP_COPY_DONE, dav_disk_copy_done);
+    htext_setopt(handle, HTEXTOP_COPY_DONE_DATA, &ddc);
+
     htext_setopt(handle, HTEXTOP_LOGCALLBACK, dav_disk_copy_log);
     htext_setopt(handle, HTEXTOP_LOGCALLBACK_DATA, &ddc);
     htext_setopt(handle, HTEXTOP_VERBOSITY, 2);
@@ -486,13 +542,23 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
     ddc.request = req;
     ddc.source = src;
     ddc.destination = dst;
+    ddc.done_lock = &done_lock;
+    ddc.done_cond = &done_cond;
+
+    pthread_mutex_init(&done_lock, NULL);
+    pthread_cond_init(&done_cond, NULL);
 
     /* Run */
     if (htext_perform(handle) != 0) {
         error = dav_shared_new_error(req, NULL,
                 HTTP_INTERNAL_SERVER_ERROR, "Could not perform the action: %s",
                 htext_error_string(handle));
+
+        pthread_cond_destroy(&done_cond);
+        pthread_mutex_destroy(&done_lock);
+
         htext_destroy(handle);
+
         return error;
     }
 
@@ -504,6 +570,9 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
 
     /* Finish */
     htext_destroy(handle);
+
+    pthread_cond_destroy(&done_cond);
+    pthread_mutex_destroy(&done_lock);
     
     if (!error) {
         bkt = apr_bucket_eos_create(ddc.output->c->bucket_alloc);