diff --git a/src/plugins/apache-httpd/src/client/htext.h b/src/plugins/apache-httpd/src/client/htext.h index 1303e77e..853a146a 100644 --- a/src/plugins/apache-httpd/src/client/htext.h +++ b/src/plugins/apache-httpd/src/client/htext.h @@ -67,6 +67,8 @@ typedef enum HTEXTOP_NOHEAD, /* Disable HEAD requests */ HTEXTOP_USE_COPY_FROM_SOURCE, /* Use Copy Source (fetching) instead of Copy Destination (pushing) */ + HTEXTOP_COPY_DONE, /* If set, called after COPY operation is done */ + HTEXTOP_COPY_DONE_DATA, /* Additional data to pass to transfer done function */ HTEXTOP_SENTINEL, /* To mark the last one */ diff --git a/src/plugins/apache-httpd/src/client/htext_api.c b/src/plugins/apache-httpd/src/client/htext_api.c index 1c3df75c..201a5e37 100644 --- a/src/plugins/apache-httpd/src/client/htext_api.c +++ b/src/plugins/apache-httpd/src/client/htext_api.c @@ -92,6 +92,8 @@ static option_entry option_definitions[] = { { OT_INT, (option_value) 0 }, /* HTEXTOP_NOHEAD */ { OT_INT, (option_value) 0 }, /* HTEXTOP_USE_COPY_FROM_SOURCE */ + { OT_POINTER, (option_value) NULL }, /* HTEXTOP_COPY_DONE */ + { OT_POINTER, (option_value) NULL }, /* HTEXTOP_COPY_DONE_DATA */ }; /** @@ -156,6 +158,10 @@ void htext_destroy(htext_handle *handle) free(handle->error_string); free(handle->partial_total); free(handle->partial_done); + for (i = 0; i < handle->partials; ++i) { + free(handle->partial_rconn[i]); + } + free(handle->partial_rconn); for (i = 0; i < HTEXTOP_SENTINEL; ++i) { if (handle->options[i].type == OT_STRING) @@ -249,6 +255,7 @@ int htext_perform(htext_handle *handle) { void* (*performer)(void *); const char *performer_str; + int i; /* Check status */ switch (handle->status) { @@ -263,9 +270,14 @@ int htext_perform(htext_handle *handle) free(handle->error_string); free(handle->partial_total); free(handle->partial_done); + for (i = 0; i < handle->partials; ++i) { + free(handle->partial_rconn[i]); + } + free(handle->partial_rconn); handle->error_string = NULL; handle->partial_done = NULL; handle->partial_total = NULL; + handle->partial_rconn = NULL; handle->partials = 0; /* Check we have source and destination */ @@ -326,7 +338,9 @@ int htext_perform(htext_handle *handle) void htext_abort(htext_handle *handle) { - /* TODO: Really abort */ + /* TODO: Transfer is aborted by returning non-zero value from progress + function (this doesn't happen immediately and with curl easy interface + there is no better option except closing directly transfer socket) */ handle->status = HTEXTS_ABORTED; } diff --git a/src/plugins/apache-httpd/src/client/htext_common.c b/src/plugins/apache-httpd/src/client/htext_common.c index 34a28353..219988dc 100644 --- a/src/plugins/apache-httpd/src/client/htext_common.c +++ b/src/plugins/apache-httpd/src/client/htext_common.c @@ -40,8 +40,6 @@ void htext_partial_clean(htext_chunk *p) curl_easy_cleanup(p->curl); if (p->fd) GETIO(p->handle) ->close(p->fd); - if (p->chunk_rconn && *(p->chunk_rconn)) - free(*(p->chunk_rconn)); if (p->error_string) free(p->error_string); if (p->http_response) @@ -219,28 +217,37 @@ int htext_progress_callback(void *pp, curl_off_t dltotal, curl_off_t dlnow, *(partial->chunk_done) = dlnow; } - CURLcode res = CURLE_OK; - char *ip = NULL; - long port = 0; - - if (res == CURLE_OK) - res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_IP, &ip); - if (res == CURLE_OK) - res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_PORT, &port); - if (res == CURLE_OK && ip) { - int len = strlen(ip)+15; - char *rconn = malloc(len); - if (strchr(ip, ':')) { - snprintf(rconn, len, "tcp:[%s]:%ld", ip, port); - } else { - snprintf(rconn, len, "tcp:%s:%ld", ip, port); + /* Add remote connection only after real data transfer starts + otherwise ip:port info could come previous (headnode) connection */ + if (dltotal > 0 || dlnow > 0 || ultotal > 0 || ulnow > 0) { + CURLcode res = CURLE_OK; + char *ip = NULL; + long port = 0; + + if (res == CURLE_OK) + res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_IP, &ip); + if (res == CURLE_OK) + res = curl_easy_getinfo(partial->curl, CURLINFO_PRIMARY_PORT, &port); + + if (res == CURLE_OK && ip && port) { + char rconn[HTEXT_PERF_MARKER_MAX_RCONN_SIZE]; + if (strchr(ip, ':')) { + snprintf(rconn, sizeof(rconn), "tcp:[%s]:%ld", ip, port); + } else { + snprintf(rconn, sizeof(rconn), "tcp:%s:%ld", ip, port); + } + htext_log(partial->handle, "connection %s, download %ld/%ld, upload %ld/%ld", rconn, dlnow, dltotal, ulnow, ultotal); + if (!*(partial->chunk_rconn)) + *(partial->chunk_rconn) = calloc(sizeof(char), HTEXT_PERF_MARKER_MAX_RCONN_SIZE); + if (strcmp(*(partial->chunk_rconn), rconn)) + strcpy(*(partial->chunk_rconn), rconn); } - htext_log(partial->handle, "connection %s, download %ld/%ld, upload %ld/%ld", rconn, dlnow, dltotal, ulnow, ultotal); - char *tmp = *(partial->chunk_rconn); - *(partial->chunk_rconn) = rconn; - if (tmp) free(tmp); } + /* Let curl abort terminated transfer (doesn't happen immediately) */ + if (partial->handle->status == HTEXTS_ABORTED) + return 1; + return 0; } diff --git a/src/plugins/apache-httpd/src/client/htext_copy.c b/src/plugins/apache-httpd/src/client/htext_copy.c index 88c4aacd..0be2f969 100644 --- a/src/plugins/apache-httpd/src/client/htext_copy.c +++ b/src/plugins/apache-httpd/src/client/htext_copy.c @@ -83,6 +83,7 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb, memset(perf_data, 0, sizeof(*perf_data)); perf_data->index = -1; perf_data->transferred = -1; + perf_data->rconn[0] = '\0'; } else if (strncasecmp("Timestamp:", p, 10) == 0) { @@ -94,15 +95,17 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb, } else if (strncasecmp("Stripe Bytes Transferred:", p, 25) == 0) { - perf_data->transferred = atol(p + 26); + perf_data->transferred = atol(p + 25); } else if (strncasecmp("Total Stripe Count:", p, 19) == 0) { - perf_data->count = atoi(p + 20); + perf_data->count = atoi(p + 19); } else if (strncasecmp("RemoteConnections:", p, 18) == 0) { - strcpy(perf_data->rconn, p + 19); + char *c = p+18; + while (isspace(*c)) c++; + snprintf(perf_data->rconn, sizeof(perf_data->rconn), "%s", c); } // skip unused dCache perf markers else if (strncasecmp("State:", p, 6) == 0) {} @@ -137,8 +140,12 @@ static size_t htext_copy_write_callback(char *buffer, size_t size, size_t nmemb, handle->partial_done[perf_data->index] = perf_data->transferred; handle->partial_total[perf_data->index] = 0; - handle->partial_rconn[perf_data->index] = perf_data->rconn[0] != '\0' ? strdup(perf_data->rconn) : NULL; - + if (perf_data->rconn[0] != '\0') { + if (!handle->partial_rconn[perf_data->index]) + handle->partial_rconn[perf_data->index] = calloc(sizeof(char), HTEXT_PERF_MARKER_MAX_RCONN_SIZE); + if (strcmp(handle->partial_rconn[perf_data->index], perf_data->rconn)) + strcpy(handle->partial_rconn[perf_data->index], perf_data->rconn); + } } } @@ -257,8 +264,15 @@ void *htext_copy_method(void *h) handle->http_status = control.http_status; + /* Call transfer done callback */ + if (GETPTR(handle, HTEXTOP_COPY_DONE)) { + void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE); + copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA)); + } + /* Clean up */ htext_partial_clean(&control); curl_easy_cleanup(curl); + return NULL ; } diff --git a/src/plugins/apache-httpd/src/client/htext_get.c b/src/plugins/apache-httpd/src/client/htext_get.c index 0fb49bde..ae9b686b 100644 --- a/src/plugins/apache-httpd/src/client/htext_get.c +++ b/src/plugins/apache-httpd/src/client/htext_get.c @@ -276,6 +276,13 @@ void *htext_get_method(void *h) handle->status = HTEXTS_SUCCEEDED; } + /* Call transfer done callback */ + if (GETPTR(handle, HTEXTOP_COPY_DONE)) { + void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE); + copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA)); + } + + /* Clean up */ curl_easy_cleanup(curl); free(partial_array); diff --git a/src/plugins/apache-httpd/src/client/htext_private.h b/src/plugins/apache-httpd/src/client/htext_private.h index 0a406df7..b4369c1e 100644 --- a/src/plugins/apache-httpd/src/client/htext_private.h +++ b/src/plugins/apache-httpd/src/client/htext_private.h @@ -113,13 +113,14 @@ typedef struct htext_chunk htext_chunk; * and unprocessed last line from input data */ #define HTEXT_PERF_MARKER_MAX_LINE_SIZE 1024 +#define HTEXT_PERF_MARKER_MAX_RCONN_SIZE 256 struct htext_perf_marker_parsed_data { time_t latest; /* Timestamp for received data */ int index; /* Stripe index */ off_t transferred; /* Stripe bytes transferred */ int count; /* Total stripe count */ - char rconn[HTEXT_PERF_MARKER_MAX_LINE_SIZE]; /* Remote connections */ + char rconn[HTEXT_PERF_MARKER_MAX_RCONN_SIZE]; /* Remote connections */ char remaining[HTEXT_PERF_MARKER_MAX_LINE_SIZE]; /* Remaining input data from unterminated last line */ }; diff --git a/src/plugins/apache-httpd/src/client/htext_put.c b/src/plugins/apache-httpd/src/client/htext_put.c index 0c3e4c1d..94d8781a 100644 --- a/src/plugins/apache-httpd/src/client/htext_put.c +++ b/src/plugins/apache-httpd/src/client/htext_put.c @@ -377,6 +377,12 @@ void *htext_put_method(void *h) handle->status = HTEXTS_SUCCEEDED; } + /* Call transfer done callback */ + if (GETPTR(handle, HTEXTOP_COPY_DONE)) { + void (*copy_done)(htext_handle *, int, void *) = GETPTR(handle, HTEXTOP_COPY_DONE); + copy_done(handle, handle->status, GETPTR(handle, HTEXTOP_COPY_DONE_DATA)); + } + curl_easy_cleanup(curl); free(partial_array); diff --git a/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c b/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c index dd586da6..cdc22c30 100644 --- a/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c +++ b/src/plugins/apache-httpd/src/mod_lcgdm_disk/copy.c @@ -42,6 +42,8 @@ typedef struct request_rec *request; const char *source; const char *destination; + pthread_mutex_t *done_lock; + pthread_cond_t *done_cond; } dav_disk_copy_data; /* Custom I/O handler */ @@ -191,6 +193,30 @@ static void dav_disk_copy_log(htext_handle *handle, HTEXT_LOG_TYPE type, } } +/** + * Called by htext when transfer is done + * @param handle The handle that triggers the call + * @param status The size of the message (it might not be NULL terminated) + * @param ud User defined data + */ +static void dav_disk_copy_done(htext_handle *handle, int status, void *ud) +{ + dav_disk_copy_data *ddc = (dav_disk_copy_data*) ud; + + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request, + "Remote copy done (%d): %s => %s status %i", + htext_http_code(handle), ddc->source, ddc->destination, status); + + // send signal to finish perf marker loop +ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status); + pthread_mutex_lock(ddc->done_lock); +ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status); + pthread_cond_signal(ddc->done_cond); +ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status); + pthread_mutex_unlock(ddc->done_lock); +ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, "EEEE Remote copy done (%d): %s => %s status %i", htext_http_code(handle), ddc->source, ddc->destination, status); +} + /** * Generates and sends the performance clients following GridFTP format * http://www.ogf.org/documents/GFD.20.pdf (Appendix II), skipping the @@ -201,14 +223,14 @@ static void dav_disk_send_performance_markers(dav_disk_copy_data *ddc, size_t n, { (void) total; - char buf[80]; + char buf[256+2]; // HTEXT_PERF_MARKER_MAX_RCONN_SIZE+2 time_t timestamp = time(NULL ); size_t i; for (i = 0; i < n; ++i) { buf[0] = '\0'; if (rconn && rconn[i]) { - snprintf(buf, 78, "\tRemoteConnections: %s", rconn[i]); + snprintf(buf, sizeof(buf)-2, "\tRemoteConnections: %s", rconn[i]); strcat(buf, "\n"); // we reserved space for endl } apr_brigade_printf(ddc->brigade, ap_filter_flush, ddc->output, @@ -228,52 +250,81 @@ static void dav_disk_send_performance_markers(dav_disk_copy_data *ddc, size_t n, static dav_error *dav_disk_pool_and_feedback(htext_handle *handle, dav_disk_copy_data *ddc) { - int wait, status; + int status, interval = 1, done = 0; dav_error *error = NULL; const char *error_string; - do { - size_t *total, *done, n, i; - size_t globalDone, globalTotal; - char **rconn; + pthread_mutex_lock(ddc->done_lock); + do { status = htext_status(handle); - switch (status) { - case HTEXTS_SUCCEEDED: - case HTEXTS_FAILED: - case HTEXTS_ABORTED: - wait = 0; - break; - default: - /* In the first go we need to set the reply */ - if (ddc->request->status == 0) { - ddc->request->status = HTTP_ACCEPTED; - ap_set_content_type(ddc->request, "text/plain"); - } - /* Print progress */ - htext_progress(handle, &n, &total, &done, &rconn); - globalDone = globalTotal = 0; - for (i = 0; i < n; ++i) { - globalDone += done[i]; - globalTotal += total[i]; - } - - dav_disk_send_performance_markers(ddc, n, total, done, rconn); - - if (ap_fflush(ddc->output, ddc->brigade) == APR_SUCCESS) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request, - "COPY '%s' %lu/%lu", ddc->request->uri, - (unsigned long) globalDone, - (unsigned long) globalTotal); - wait = 1; - sleep(1); - } - else { - wait = 0; - htext_abort(handle); - } + + if (status == HTEXTS_SUCCEEDED || status == HTEXTS_FAILED || status == HTEXTS_ABORTED) { + done = 1; + } + else { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += interval; + interval = 5; // 5s between perf markers except for first one + + int rc = pthread_cond_timedwait(ddc->done_cond, ddc->done_lock, &ts); + if (rc == 0) { + // transfer done, continue to get new status + continue; + } + else if (rc == ETIMEDOUT) { + // ongoing transfer, reached time to print new perfmarker + } + else { + ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, + "Waiting for perf marker condition failed for COPY '%s' rc %i", + ddc->request->uri, rc); + htext_abort(handle); + continue; + } + } + + /* Always write last performance marker and in defined intervals */ + { + size_t *total, *done, n, i; + size_t globalDone, globalTotal; + char **rconn; + + /* In the first go we need to set the reply */ + if (ddc->request->status == 0) { + ddc->request->status = HTTP_ACCEPTED; + ap_set_content_type(ddc->request, "text/plain"); + } + /* Print progress */ + htext_progress(handle, &n, &total, &done, &rconn); + dav_disk_send_performance_markers(ddc, n, total, done, rconn); + + globalDone = globalTotal = 0; + for (i = 0; i < n; ++i) { + globalDone += done[i]; + globalTotal += total[i]; + } + + if (ap_fflush(ddc->output, ddc->brigade) == APR_SUCCESS) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, ddc->request, + "COPY '%s' %lu/%lu", ddc->request->uri, + (unsigned long) globalDone, + (unsigned long) globalTotal); + } + else { + ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, ddc->request, + "Failed to send perf marker for COPY '%s' %lu/%lu", + ddc->request->uri, + (unsigned long) globalDone, + (unsigned long) globalTotal); + htext_abort(handle); + } } - } while (wait); + + } while (!done); + + pthread_mutex_unlock(ddc->done_lock); error_string = htext_error_string(handle); switch (status) { @@ -439,6 +490,8 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr dav_disk_copy_data ddc; request_rec* req = res->info->request; dav_disk_dir_conf* d_conf = res->info->d_conf; + pthread_mutex_t done_lock; + pthread_cond_t done_cond; int oldcancel; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldcancel); @@ -475,6 +528,9 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr htext_setopt(handle, HTEXTOP_LOW_SPEED_TIME, d_conf->low_speed_time); htext_setopt(handle, HTEXTOP_LOW_SPEED_LIMIT, d_conf->low_speed_limit); + htext_setopt(handle, HTEXTOP_COPY_DONE, dav_disk_copy_done); + htext_setopt(handle, HTEXTOP_COPY_DONE_DATA, &ddc); + htext_setopt(handle, HTEXTOP_LOGCALLBACK, dav_disk_copy_log); htext_setopt(handle, HTEXTOP_LOGCALLBACK_DATA, &ddc); htext_setopt(handle, HTEXTOP_VERBOSITY, 2); @@ -486,13 +542,23 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr ddc.request = req; ddc.source = src; ddc.destination = dst; + ddc.done_lock = &done_lock; + ddc.done_cond = &done_cond; + + pthread_mutex_init(&done_lock, NULL); + pthread_cond_init(&done_cond, NULL); /* Run */ if (htext_perform(handle) != 0) { error = dav_shared_new_error(req, NULL, HTTP_INTERNAL_SERVER_ERROR, "Could not perform the action: %s", htext_error_string(handle)); + + pthread_cond_destroy(&done_cond); + pthread_mutex_destroy(&done_lock); + htext_destroy(handle); + return error; } @@ -504,6 +570,9 @@ static dav_error *dav_disk_generic_copy(const dav_resource* res, const char* upr /* Finish */ htext_destroy(handle); + + pthread_cond_destroy(&done_cond); + pthread_mutex_destroy(&done_lock); if (!error) { bkt = apr_bucket_eos_create(ddc.output->c->bucket_alloc);