diff --git a/src/plugins/apache-httpd/src/client/htext.h b/src/plugins/apache-httpd/src/client/htext.h
index 1303e77e..853a146a 100644
--- a/src/plugins/apache-httpd/src/client/htext.h
+++ b/src/plugins/apache-httpd/src/client/htext.h
@@ -67,6 +67,8 @@ typedef enum
HTEXTOP_NOHEAD, /* Disable HEAD requests */
HTEXTOP_USE_COPY_FROM_SOURCE, /* Use Copy Source (fetching) instead of Copy Destination (pushing) */
+ HTEXTOP_COPY_DONE, /* If set, called after COPY operation is done */
+ HTEXTOP_COPY_DONE_DATA, /* Additional data to pass to transfer done function */
HTEXTOP_SENTINEL, /* To mark the last one */
diff --git a/src/plugins/apache-httpd/src/client/htext_api.c b/src/plugins/apache-httpd/src/client/htext_api.c
index 1c3df75c..201a5e37 100644
--- a/src/plugins/apache-httpd/src/client/htext_api.c
+++ b/src/plugins/apache-httpd/src/client/htext_api.c
@@ -92,6 +92,8 @@ static option_entry option_definitions[] = {
{ OT_INT, (option_value) 0 }, /* HTEXTOP_NOHEAD */
{ OT_INT, (option_value) 0 }, /* HTEXTOP_USE_COPY_FROM_SOURCE */
+ { OT_POINTER, (option_value) NULL }, /* HTEXTOP_COPY_DONE */
+ { OT_POINTER, (option_value) NULL }, /* HTEXTOP_COPY_DONE_DATA */
};
/**
@@ -156,6 +158,10 @@ void htext_destroy(htext_handle *handle)
free(handle->error_string);
free(handle->partial_total);
free(handle->partial_done);
+ for (i = 0; i < handle->partials; ++i) {
+ free(handle->partial_rconn[i]);
+ }
+ free(handle->partial_rconn);
for (i = 0; i < HTEXTOP_SENTINEL; ++i) {
if (handle->options[i].type == OT_STRING)
@@ -249,6 +255,7 @@ int htext_perform(htext_handle *handle)
{
void* (*performer)(void *);
const char *performer_str;
+ int i;
/* Check status */
switch (handle->status) {
@@ -263,9 +270,14 @@ int htext_perform(htext_handle *handle)
free(handle->error_string);
free(handle->partial_total);
free(handle->partial_done);
+ for (i = 0; i < handle->partials; ++i) {
+ free(handle->partial_rconn[i]);
+ }
+ free(handle->partial_rconn);
handle->error_string = NULL;
handle->partial_done = NULL;
handle->partial_total = NULL;
+ handle->partial_rconn = NULL;
handle->partials = 0;
/* Check we have source and destination */
@@ -326,7 +338,9 @@ int htext_perform(htext_handle *handle)
void htext_abort(htext_handle *handle)
{
- /* TODO: Really abort */
+ /* TODO: Transfer is aborted by returning non-zero value from progress
+ function (this doesn't happen immediately and with curl easy interface
+ there is no better option except closing directly transfer socket) */
handle->status = HTEXTS_ABORTED;
}
diff --git a/src/plugins/apache-httpd/src/client/htext_common.c b/src/plugins/apache-httpd/src/client/htext_common.c
index 34a28353..219988dc 100644
--- a/src/plugins/apache-httpd/src/client/htext_common.c
+++ b/src/plugins/apache-httpd/src/client/htext_common.c
@@ -40,8 +40,6 @@ void htext_partial_clean(htext_chunk *p)
curl_easy_cleanup(p->curl);
if (p->fd)
GETIO(p->handle) ->close(p->fd);
- if (p->chunk_rconn && *(p->chunk_rconn))
- free(*(p->chunk_rconn));
if (p->error_string)
free(p->error_string);
if (p->http_response)
@@ -219,28 +217,37 @@ int htext_progress_callback(void *pp, curl_off_t dltotal, curl_off_t dlnow,
*(partial->chunk_done) = dlnow;
}
- CURLcode res = CURLE_OK;
- char *ip = NULL;
- long port = 0;
-
- if (res == CURLE_OK)
- res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_IP, &ip);
- if (res == CURLE_OK)
- res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_PORT, &port);
- if (res == CURLE_OK && ip) {
- int len = strlen(ip)+15;
- char *rconn = malloc(len);
- if (strchr(ip, ':')) {
- snprintf(rconn, len, "tcp:[%s]:%ld", ip, port);
- } else {
- snprintf(rconn, len, "tcp:%s:%ld", ip, port);
+ /* Add remote connection only after real data transfer starts
+ otherwise ip:port info could come previous (headnode) connection */
+ if (dltotal > 0 || dlnow > 0 || ultotal > 0 || ulnow > 0) {
+ CURLcode res = CURLE_OK;
+ char *ip = NULL;
+ long port = 0;
+
+ if (res == CURLE_OK)
+ res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_IP, &ip);
+ if (res == CURLE_OK)
+ res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_PORT, &port);
+
+ if (res == CURLE_OK && ip && port) {
+ char rconn[HTEXT_PERF_MARKER_MAX_RCONN_SIZE];
+ if (strchr(ip, ':')) {
+ snprintf(rconn, sizeof(rconn), "tcp:[%s]:%ld", ip, port);
+ } else {
+ snprintf(rconn, sizeof(rconn), "tcp:%s:%ld", ip, port);
+ }
+ htext_log(partial->handle, "connection %s, download %ld/%ld, upload %ld/%ld", rconn, dlnow, dltotal, ulnow, ultotal);
+ if (!*(partial->chunk_rconn))
+ *(partial->chunk_rconn) = calloc(sizeof(char), HTEXT_PERF_MARKER_MAX_RCONN_SIZE);
+ if (strcmp(*(partial->chunk_rconn), rconn))
+ strcpy(*(partial->chunk_rconn), rconn);
}
- htext_log(partial->handle, "connection %s, download %ld/%ld, upload %ld/%ld", rconn, dlnow, dltotal, ulnow, ultotal);
- char *tmp = *(partial->chunk_rconn);
- *(partial->chunk_rconn) = rconn;
- if (tmp) free(tmp);
}
+ /* Let curl abort terminated transfer (doesn't happen immediately) */
+ if (partial->handle->status == HTEXTS_ABORTED)
+ return 1;
+
return 0;
}
diff --git a/src/plugins/apache-httpd/src/client/htext_copy.c b/src/plugins/apache-httpd/src/client/htext_copy.c
index 88c4aacd..0be2f969 100644
--- a/src/plugins/apache-httpd/src/client/htext_copy.c
+++ b/src/plugins/apache-httpd/src/client/htext_copy.c
@@ -83,6 +83,7 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb,
memset(perf_data, 0, sizeof(*perf_data));
perf_data->index = -1;
perf_data->transferred = -1;
+ perf_data->rconn[0] = '\0';
}
else if (strncasecmp("Timestamp:", p, 10) == 0)
{
@@ -94,15 +95,17 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb,
}
else if (strncasecmp("Stripe Bytes Transferred:", p, 25) == 0)
{
- perf_data->transferred = atol(p + 26);
+ perf_data->transferred = atol(p + 25);
}
else if (strncasecmp("Total Stripe Count:", p, 19) == 0)
{
- perf_data->count = atoi(p + 20);
+ perf_data->count = atoi(p + 19);
}
else if (strncasecmp("RemoteConnections:", p, 18) == 0)
{
- strcpy(perf_data->rconn, p + 19);
+ char *c = p+18;
+ while (isspace(*c)) c++;
+ snprintf(perf_data->rconn, sizeof(perf_data->rconn), "%s", c);
}
// skip unused dCache perf markers
else if (strncasecmp("State:", p, 6) == 0) {}
@@ -137,8 +140,12 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb,
handle->partial_done[perf_data->index] = perf_data->transferred;
handle->partial_total[perf_data->index] = 0;
- handle->partial_rconn[perf_data->index] = perf_data->rconn[0] != '\0' ? strdup(perf_data->rconn) : NULL;
-
+ if (perf_data->rconn[0] != '\0') {
+ if (!handle->partial_rconn[perf_data->index])
+ handle->partial_rconn[perf_data->index] = calloc(sizeof(char), HTEXT_PERF_MARKER_MAX_RCONN_SIZE);
+ if (strcmp(handle->partial_rconn[perf_data->index], perf_data->rconn))
+ strcpy(handle->partial_rconn[perf_data->index], perf_data->rconn);
+ }
}
}
@@ -257,8 +264,15 @@ void *htext_copy_method(void *h)
handle->http_status = control.http_status;
+ /* Call transfer done callback */
+ if (GETPTR(handle, HTEXTOP_COPY_DONE)) {
+ void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE);
+ copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA));
+ }
+
/* Clean up */
htext_partial_clean(&control);
curl_easy_cleanup(curl);
+
return NULL ;
}
diff --git a/src/plugins/apache-httpd/src/client/htext_get.c b/src/plugins/apache-httpd/src/client/htext_get.c
index 0fb49bde..ae9b686b 100644
--- a/src/plugins/apache-httpd/src/client/htext_get.c
+++ b/src/plugins/apache-httpd/src/client/htext_get.c
@@ -276,6 +276,13 @@ void *htext_get_method(void *h)
handle->status = HTEXTS_SUCCEEDED;
}
+ /* Call transfer done callback */
+ if (GETPTR(handle, HTEXTOP_COPY_DONE)) {
+ void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE);
+ copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA));
+ }
+
+ /* Clean up */
curl_easy_cleanup(curl);
free(partial_array);
diff --git a/src/plugins/apache-httpd/src/client/htext_private.h b/src/plugins/apache-httpd/src/client/htext_private.h
index 0a406df7..b4369c1e 100644
--- a/src/plugins/apache-httpd/src/client/htext_private.h
+++ b/src/plugins/apache-httpd/src/client/htext_private.h
@@ -113,13 +113,14 @@ typedef struct htext_chunk htext_chunk;
* and unprocessed last line from input data
*/
#define HTEXT_PERF_MARKER_MAX_LINE_SIZE 1024
+#define HTEXT_PERF_MARKER_MAX_RCONN_SIZE 256
struct htext_perf_marker_parsed_data
{
time_t latest; /* Timestamp for received data */
int index; /* Stripe index */
off_t transferred; /* Stripe bytes transferred */
int count; /* Total stripe count */
- char rconn[HTEXT_PERF_MARKER_MAX_LINE_SIZE]; /* Remote connections */
+ char rconn[HTEXT_PERF_MARKER_MAX_RCONN_SIZE]; /* Remote connections */
char remaining[HTEXT_PERF_MARKER_MAX_LINE_SIZE]; /* Remaining input data from unterminated last line */
};
diff --git a/src/plugins/apache-httpd/src/client/htext_put.c b/src/plugins/apache-httpd/src/client/htext_put.c
index 0c3e4c1d..94d8781a 100644
--- a/src/plugins/apache-httpd/src/client/htext_put.c
+++ b/src/plugins/apache-httpd/src/client/htext_put.c
@@ -377,6 +377,12 @@ void *htext_put_method(void *h)
handle->status = HTEXTS_SUCCEEDED;
}
+ /* Call transfer done callback */
+ if (GETPTR(handle, HTEXTOP_COPY_DONE)) {
+ void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE);
+ copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA));
+ }
+
curl_easy_cleanup(curl);
free(partial_array);
diff --git a/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c b/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c
index dd586da6..cdc22c30 100644
--- a/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c
+++ b/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c
@@ -42,6 +42,8 @@ typedef struct
request_rec *request;
const char *source;
const char *destination;
+ pthread_mutex_t *done_lock;
+ pthread_cond_t *done_cond;
} dav_disk_copy_data;
/* Custom I/O handler */
@@ -191,6 +193,30 @@ static void dav_disk_copy_log(htext_handle *handle, HTEXT_LOG_TYPE type,
}
}
+/**
+ * Called by htext when transfer is done
+ * @param handle The handle that triggers the call
+ * @param status The size of the message (it might not be NULL terminated)
+ * @param ud User defined data
+ */
+static void dav_disk_copy_done(htext_handle *handle, int status, void *ud)
+{
+ dav_disk_copy_data *ddc = (dav_disk_copy_data*) ud;
+
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request,
+ "Remote copy done (%d): %s => %s status %i",
+ htext_http_code(handle), ddc->source, ddc->destination, status);
+
+ // send signal to finish perf marker loop
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+ pthread_mutex_lock(ddc->done_lock);
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+ pthread_cond_signal(ddc->done_cond);
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+ pthread_mutex_unlock(ddc->done_lock);
+ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status);
+}
+
/**
* Generates and sends the performance clients following GridFTP format
* http://www.ogf.org/documents/GFD.20.pdf (Appendix II), skipping the
@@ -201,14 +223,14 @@ static void dav_disk_send_performance_markers(dav_disk_copy_data *ddc, size_t n,
{
(void) total;
- char buf[80];
+ char buf[256+2]; // HTEXT_PERF_MARKER_MAX_RCONN_SIZE+2
time_t timestamp = time(NULL );
size_t i;
for (i = 0; i < n; ++i) {
buf[0] = '\0';
if (rconn && rconn[i]) {
- snprintf(buf, 78, "\tRemoteConnections: %s", rconn[i]);
+ snprintf(buf, sizeof(buf)-2, "\tRemoteConnections: %s", rconn[i]);
strcat(buf, "\n"); // we reserved space for endl
}
apr_brigade_printf(ddc->brigade, ap_filter_flush, ddc->output,
@@ -228,52 +250,81 @@ static void dav_disk_send_performance_markers(dav_disk_copy_data *ddc, size_t n,
static dav_error *dav_disk_pool_and_feedback(htext_handle *handle,
dav_disk_copy_data *ddc)
{
- int wait, status;
+ int status, interval = 1, done = 0;
dav_error *error = NULL;
const char *error_string;
- do {
- size_t *total, *done, n, i;
- size_t globalDone, globalTotal;
- char **rconn;
+ pthread_mutex_lock(ddc->done_lock);
+ do {
status = htext_status(handle);
- switch (status) {
- case HTEXTS_SUCCEEDED:
- case HTEXTS_FAILED:
- case HTEXTS_ABORTED:
- wait = 0;
- break;
- default:
- /* In the first go we need to set the reply */
- if (ddc->request->status == 0) {
- ddc->request->status = HTTP_ACCEPTED;
- ap_set_content_type(ddc->request, "text/plain");
- }
- /* Print progress */
- htext_progress(handle, &n, &total, &done, &rconn);
- globalDone = globalTotal = 0;
- for (i = 0; i < n; ++i) {
- globalDone += done[i];
- globalTotal += total[i];
- }
-
- dav_disk_send_performance_markers(ddc, n, total, done, rconn);
-
- if (ap_fflush(ddc->output, ddc->brigade) == APR_SUCCESS) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request,
- "COPY '%s' %lu/%lu", ddc->request->uri,
- (unsigned long) globalDone,
- (unsigned long) globalTotal);
- wait = 1;
- sleep(1);
- }
- else {
- wait = 0;
- htext_abort(handle);
- }
+
+ if (status == HTEXTS_SUCCEEDED || status == HTEXTS_FAILED || status == HTEXTS_ABORTED) {
+ done = 1;
+ }
+ else {
+ struct timespec ts;
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts.tv_sec += interval;
+ interval = 5; // 5s between perf markers except for first one
+
+ int rc = pthread_cond_timedwait(ddc->done_cond, ddc->done_lock, &ts);
+ if (rc == 0) {
+ // transfer done, continue to get new status
+ continue;
+ }
+ else if (rc == ETIMEDOUT) {
+ // ongoing transfer, reached time to print new perfmarker
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request,
+ "Waiting for perf marker condition failed for COPY '%s' rc %i",
+ ddc->request->uri, rc);
+ htext_abort(handle);
+ continue;
+ }
+ }
+
+ /* Always write last performance marker and in defined intervals */
+ {
+ size_t *total, *done, n, i;
+ size_t globalDone, globalTotal;
+ char **rconn;
+
+ /* In the first go we need to set the reply */
+ if (ddc->request->status == 0) {
+ ddc->request->status = HTTP_ACCEPTED;
+ ap_set_content_type(ddc->request, "text/plain");
+ }
+ /* Print progress */
+ htext_progress(handle, &n, &total, &done, &rconn);
+ dav_disk_send_performance_markers(ddc, n, total, done, rconn);
+
+ globalDone = globalTotal = 0;
+ for (i = 0; i < n; ++i) {
+ globalDone += done[i];
+ globalTotal += total[i];
+ }
+
+ if (ap_fflush(ddc->output, ddc->brigade) == APR_SUCCESS) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request,
+ "COPY '%s' %lu/%lu", ddc->request->uri,
+ (unsigned long) globalDone,
+ (unsigned long) globalTotal);
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request,
+ "Failed to send perf marker for COPY '%s' %lu/%lu",
+ ddc->request->uri,
+ (unsigned long) globalDone,
+ (unsigned long) globalTotal);
+ htext_abort(handle);
+ }
}
- } while (wait);
+
+ } while (!done);
+
+ pthread_mutex_unlock(ddc->done_lock);
error_string = htext_error_string(handle);
switch (status) {
@@ -439,6 +490,8 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
dav_disk_copy_data ddc;
request_rec* req = res->info->request;
dav_disk_dir_conf* d_conf = res->info->d_conf;
+ pthread_mutex_t done_lock;
+ pthread_cond_t done_cond;
int oldcancel;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldcancel);
@@ -475,6 +528,9 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
htext_setopt(handle, HTEXTOP_LOW_SPEED_TIME, d_conf->low_speed_time);
htext_setopt(handle, HTEXTOP_LOW_SPEED_LIMIT, d_conf->low_speed_limit);
+ htext_setopt(handle, HTEXTOP_COPY_DONE, dav_disk_copy_done);
+ htext_setopt(handle, HTEXTOP_COPY_DONE_DATA, &ddc);
+
htext_setopt(handle, HTEXTOP_LOGCALLBACK, dav_disk_copy_log);
htext_setopt(handle, HTEXTOP_LOGCALLBACK_DATA, &ddc);
htext_setopt(handle, HTEXTOP_VERBOSITY, 2);
@@ -486,13 +542,23 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
ddc.request = req;
ddc.source = src;
ddc.destination = dst;
+ ddc.done_lock = &done_lock;
+ ddc.done_cond = &done_cond;
+
+ pthread_mutex_init(&done_lock, NULL);
+ pthread_cond_init(&done_cond, NULL);
/* Run */
if (htext_perform(handle) != 0) {
error = dav_shared_new_error(req, NULL,
HTTP_INTERNAL_SERVER_ERROR, "Could not perform the action: %s",
htext_error_string(handle));
+
+ pthread_cond_destroy(&done_cond);
+ pthread_mutex_destroy(&done_lock);
+
htext_destroy(handle);
+
return error;
}
@@ -504,6 +570,9 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr
/* Finish */
htext_destroy(handle);
+
+ pthread_cond_destroy(&done_cond);
+ pthread_mutex_destroy(&done_lock);
if (!error) {
bkt = apr_bucket_eos_create(ddc.output->c->bucket_alloc);