diff --git a/README.md b/README.md index d9ce246..107ed88 100644 --- a/README.md +++ b/README.md @@ -3,17 +3,20 @@ Stream files to and from iRODS. tears either reads from stdin and writes a file to iRODS or reads a file from iRODS and writes to stdout. Basic usage for writing is: -file_making_program | tears -w /path/to/irods/file +`file_making_program | tears -w /path/to/irods/file` or for reading: -tears /path/to/irods/file | file_receiving_program +`tears /path/to/irods/file | file_receiving_program` Two things to note. Firstly, tears will try to pick the best iRODS host to read or write from. This can cause authentication problems and can be switched off by using the -d option. Secondly, the iRODS file can be in the form of a URI (proposed [here](https://github.com/samtools/htslib/issues/229)). The URI is of the form: -irods://[irodsUserName%23irodsZone@][irodsHost][:irodsPort]/collection_path/data_object - +_irods://[irodsUserName%23irodsZone@][irodsHost][:irodsPort]/collection_path/data_object_ +### Build +`autoreconf -i -f` +`./configure --with-irods` +`make` diff --git a/configure.ac b/configure.ac index d0ea791..b0064bb 100644 --- a/configure.ac +++ b/configure.ac @@ -1,6 +1,6 @@ dnl -*- Autoconf -*- AC_COPYRIGHT([ -Copyright (c) 2015-2016, 2018 Genome Research Ltd. +Copyright (c) 2015-2016, 2018-2019 Genome Research Ltd. Author: Andrew Whitwham This file is part of tears. @@ -20,7 +20,7 @@ dnl Process this file with autoconf to produce a configure script. AC_PREREQ([2.68]) -AC_INIT([tears], [1.2.4], [aw7+github@sanger.ac.uk], [tears], [https://github.com/whitwham/tears]) +AC_INIT([tears], [1.3], [aw7+github@sanger.ac.uk], [tears], [https://github.com/whitwham/tears]) AC_USE_SYSTEM_EXTENSIONS AM_INIT_AUTOMAKE([-Wall foreign]) diff --git a/tears.c b/tears.c index 17dafdb..99c8564 100644 --- a/tears.c +++ b/tears.c @@ -1,5 +1,5 @@ /* -* Copyright (c) 2015-2016, 2018 Genome Research Ltd. +* Copyright (c) 2015-2016, 2018-2019 Genome Research Ltd. * * Author: Andrew Whitwham * @@ -34,6 +34,8 @@ #include #include #include +#include +#include #define DEFAULT_BUFFER_SIZE 1048576 @@ -58,6 +60,7 @@ void usage_and_exit(char *pname, int exit_code) { fprintf(stdout, "\t-v\t\tverbose mode\n"); fprintf(stdout, "\t-d\t\tuse default server\n"); fprintf(stdout, "\t-f\t\tforce overwrite of existing file on iRODS\n"); + fprintf(stdout, "\t-t minutes\tclose and reopen irods object periodically\n"); fprintf(stdout, "\t-h\t\tprint this help\n\n"); fprintf(stdout, "Version: %s Author: %s\n", PACKAGE_STRING, PACKAGE_BUGREPORT); fprintf(stdout, "Github: %s\n", PACKAGE_URL); @@ -88,6 +91,7 @@ void print_irods_error(char *msg, rErrMsg_t *err) { void error_and_exit(rcComm_t *c, const char *msg, ...) { + int ret; va_list argp; va_start(argp, msg); @@ -95,13 +99,25 @@ void error_and_exit(rcComm_t *c, const char *msg, ...) { va_end(argp); if (c) { - rcDisconnect(c); + if ((ret = rcDisconnect(c))) { + fprintf(stderr, "Error: rcDisconnect returned %d\n", ret); + } } exit(EXIT_FAILURE); } +void print_time(const char *prefix) { + char date[20]; + time_t now = time(NULL); + struct tm *tm = localtime(&now); + + strftime(date, 20, "%Y-%m-%d-%H:%M:%S", tm); + fprintf(stderr, "%s%s\n", prefix, date); +} + + int irods_uri_check(char *uri, rodsEnv *env, int verb) { char *user = NULL; char *zone = NULL; @@ -207,12 +223,13 @@ int irods_uri_check(char *uri, rodsEnv *env, int verb) { void setup_dataObjInp(tears_context_t* ctx) { memset(&ctx->data_obj, 0, sizeof(ctx->data_obj)); rstrcpy(ctx->data_obj.objPath, ctx->obj_path, MAX_NAME_LEN); + if (ctx->write_to_irods) { ctx->data_obj.openFlags = O_WRONLY; - } - else { + } else { ctx->data_obj.openFlags = O_RDONLY; } + ctx->data_obj.dataSize = 0; if (ctx->force_write) { @@ -220,33 +237,41 @@ void setup_dataObjInp(tears_context_t* ctx) { } } + int connect_to_server( rcComm_t** conn, const tears_context_t* ctx) { + rErrMsg_t err_msg; *conn = rcConnect(ctx->irods_env.rodsHost, ctx->irods_env.rodsPort, ctx->irods_env.rodsUserName, ctx->irods_env.rodsZone, 0, &err_msg); + if (!*conn) { return err_msg.status; } int status = clientLogin(*conn, "", ""); + if (status < 0) { - rcDisconnect(*conn); error_and_exit(*conn, "Error: clientLogin failed with status %d:%s\n", status, get_irods_error_name(status, ctx->verbose)); } + return 0; } + int choose_server(tears_context_t* ctx) { char* new_host = NULL; rcComm_t* conn = NULL; int status = connect_to_server(&conn, ctx); + int ret; + if (status < 0) { return status; } + if (ctx->write_to_irods) { if ((status = rcGetHostForPut(conn, &ctx->data_obj, &new_host)) < 0) { fprintf(stderr, "Error: rcGetHostForPut failed with status %d:%s\n", status, get_irods_error_name(status, ctx->verbose)); @@ -258,15 +283,22 @@ int choose_server(tears_context_t* ctx) { return status; } } + if (ctx->verbose) { fprintf(stderr, "Chosen server is: %s\n", new_host); } + rstrcpy(ctx->irods_env.rodsHost, new_host, NAME_LEN); - rcDisconnect(conn); + + if ((ret = rcDisconnect(conn))) { + fprintf(stderr, "Warning: rcDisconnect returned %d\n", ret); + } + free(new_host); return 0; } + int open_or_create_data_object( rcComm_t* conn, tears_context_t* ctx, @@ -286,12 +318,17 @@ int open_or_create_data_object( dataObjLseekInp.whence = SEEK_SET; dataObjLseekInp.l1descInx = open_fd; dataObjLseekInp.offset = offset_in_bytes; + + if (ctx->verbose) { + fprintf(stderr, "Seeking to %ld bytes\n", offset_in_bytes); + } + int status = 0; - if (status = rcDataObjLseek(conn, &dataObjLseekInp, &dataObjLseekOut) < 0) { + + if ((status = rcDataObjLseek(conn, &dataObjLseekInp, &dataObjLseekOut)) < 0) { fprintf(stderr, "Error: rcDataObjLseek in file failed with status %d:%s\n", open_fd, get_irods_error_name(open_fd, ctx->verbose)); return status; - } - else if (dataObjLseekOut) { + } else if (dataObjLseekOut) { free(dataObjLseekOut); } } @@ -299,14 +336,17 @@ int open_or_create_data_object( return open_fd; } + int create_data_object( rcComm_t* conn, tears_context_t* ctx) { int open_fd = open_or_create_data_object(conn, ctx, 0, rcDataObjCreate); + if (open_fd < 0) { error_and_exit(conn, "Error: Creating file failed with status %d:%s\n", open_fd, get_irods_error_name(open_fd, ctx->verbose)); } + return open_fd; } @@ -317,18 +357,22 @@ int open_data_object( const unsigned long offset_in_bytes) { int open_fd = open_or_create_data_object(conn, ctx, offset_in_bytes, rcDataObjOpen); + if (open_fd < 0) { error_and_exit(conn, "Error: Opening file failed with status %d:%s\n", open_fd, get_irods_error_name(open_fd, ctx->verbose)); } + return open_fd; } + int is_status_connection_failure(const int status) { return (SYS_HEADER_READ_LEN_ERR == status || (status <= SYS_SOCK_READ_ERR && status >= SYS_SOCK_READ_ERR - 999)); } + int reset_stream_for_retry( rcComm_t** conn, tears_context_t* ctx, @@ -338,13 +382,13 @@ int reset_stream_for_retry( if (status < 0) { fprintf(stderr, "Reconnection failed with status %d.\n", status); } - fseek(stream, offset, SEEK_SET); + return open_data_object(*conn, ctx, offset); } + int main (int argc, char **argv) { rcComm_t *conn = NULL; - rErrMsg_t err_msg; openedDataObjInp_t open_obj; int open_fd; @@ -358,7 +402,16 @@ int main (int argc, char **argv) { memset(&ctx, 0, sizeof(ctx)); ctx.buf_size = DEFAULT_BUFFER_SIZE; - while ((opt = getopt(argc, argv, "b:vhrdwf")) != -1) { + time_t current, timeout_time; + unsigned int timeout_period = 0; + int ret; + int retrying = 0; + + bytesBuf_t data_buffer; + long read_in; + long written_out; + + while ((opt = getopt(argc, argv, "b:vhrdwft:")) != -1) { switch (opt) { case 'b': ctx.buf_size = atoi(optarg); @@ -389,6 +442,16 @@ int main (int argc, char **argv) { ctx.force_write = 1; break; + case 't': + timeout_period = atoi(optarg); + + if (timeout_period < 0) { + error_and_exit(conn, "Error: time-out period must not be negative.\n"); + } + + timeout_period *= 60; // convert to seconds + break; + case 'h': usage_and_exit(argv[0], EXIT_SUCCESS); break; @@ -399,6 +462,10 @@ int main (int argc, char **argv) { } } + if (ctx.verbose) { + print_time("START "); + } + if (optind >= argc) { fprintf(stderr, "Error: Missing iRODS file.\n"); usage_and_exit(argv[0], EXIT_FAILURE); @@ -441,28 +508,34 @@ int main (int argc, char **argv) { if (!ctx.server_set) { status = choose_server(&ctx); + if (status < 0) { error_and_exit(conn, "Error: choosing server failed with status %d\n", status, get_irods_error_name(status, ctx.verbose)); } } status = connect_to_server(&conn, &ctx); + if (status < 0) { error_and_exit(conn, "Error: failed connecting to server with status %d\n", status); } if (ctx.write_to_irods) { open_fd = create_data_object(conn, &ctx); - } - else { + } else { open_fd = open_data_object(conn, &ctx, total_written); } + if (timeout_period) { + timeout_time = time(NULL) + timeout_period; + + if (ctx.verbose) { + fprintf(stderr, "Setting time-out to %ds\n", timeout_period); + } + } + // the read/write loop while (1) { - bytesBuf_t data_buffer; - long read_in; - long written_out; // set up common data elements memset(&open_obj, 0, sizeof(open_obj)); @@ -471,17 +544,23 @@ int main (int argc, char **argv) { // time to read something if (ctx.write_to_irods) { - read_in = fread(buffer, 1, ctx.buf_size, stdin); + if (!retrying) { + read_in = fread(buffer, 1, ctx.buf_size, stdin); + } + open_obj.len = read_in; data_buffer.len = open_obj.len; - } - else { + } else { open_obj.len = ctx.buf_size; data_buffer.len = open_obj.len; if ((read_in = rcDataObjRead(conn, &open_obj, &data_buffer)) < 0) { - if (is_status_connection_failure(read_in)) { + if (is_status_connection_failure(read_in) && !retrying) { + + fprintf(stderr, "Warning: irods connection failure, retrying.\n"); + open_fd = reset_stream_for_retry(&conn, &ctx, stdout, total_written); + retrying = 1; continue; } error_and_exit(conn, "Error: rcDataObjRead failed with status %ld:%s\n", read_in, get_irods_error_name(read_in, ctx.verbose)); @@ -494,14 +573,18 @@ int main (int argc, char **argv) { if (!read_in) break; - // now try and write something + // now try to write something if (ctx.write_to_irods) { open_obj.len = read_in; data_buffer.len = open_obj.len; if ((written_out = rcDataObjWrite(conn, &open_obj, &data_buffer)) < 0) { - if (is_status_connection_failure(written_out)) { + if (is_status_connection_failure(written_out) && !retrying) { + + fprintf(stderr, "Warning: irods connection failure, retrying.\n"); + open_fd = reset_stream_for_retry(&conn, &ctx, stdin, total_written); + retrying = 1; continue; } error_and_exit(conn, "Error: rcDataObjWrite failed with status %ld\n", written_out, get_irods_error_name(written_out, ctx.verbose)); @@ -519,17 +602,67 @@ int main (int argc, char **argv) { if (read_in != written_out) { error_and_exit(conn, "Error: write fail %ld written, should be %ld.\n", written_out, read_in); } + + if (timeout_period) { + current = time(NULL); + + if (current > timeout_time) { // close object and reopen for writing again + + if (ctx.verbose) { + print_time("TIME-OUT AT "); + } + + errno = 0; + + if ((status = rcDataObjClose(conn, &open_obj)) < 0) { + error_and_exit(conn, "Error: rcDataObjClose failed with status %d:%s errno %d\n", status, get_irods_error_name(status, ctx.verbose), errno); + } + + + errno = 0; + + if ((status = open_data_object(conn, &ctx, total_written)) < 0) { + error_and_exit(conn, "Error: open_data_object failed with status %d:%s errno %d\n", status, get_irods_error_name(status, ctx.verbose), errno); + } + + if (ctx.verbose) { + fprintf(stderr, "Reopen at position %ld\n", total_written); + print_time("RESTART "); + } + + timeout_time = time(NULL) + timeout_period; + } + } + + retrying = 0; } if (ctx.verbose) { fprintf(stderr, "Total bytes written %ld\n", total_written); } - if ((status = rcDataObjClose(conn, &open_obj)) < 0) { - error_and_exit(conn, "Error: rcDataObjClose failed with status %d:%s\n", status, get_irods_error_name(status, ctx.verbose)); + if (ctx.verbose) { + print_time("CLOSING "); + } + + errno = 0; + + status = rcDataObjClose(conn, &open_obj); + + if (ctx.verbose) { + print_time("CLOSED "); + } + + if (status < 0) { + error_and_exit(conn, "Error: rcDataObjClose failed with status %d:%s errno %d\n", status, get_irods_error_name(status, ctx.verbose), errno); + } + + ret = rcDisconnect(conn); + + if (ctx.verbose) { + fprintf(stderr, "main: Disonnect return %d\n", ret); } - rcDisconnect(conn); free(buffer); exit(EXIT_SUCCESS); }