diff --git a/plugins/in_tail/tail.c b/plugins/in_tail/tail.c index 82bf1dc69f1..344f1e8340e 100644 --- a/plugins/in_tail/tail.c +++ b/plugins/in_tail/tail.c @@ -78,7 +78,7 @@ static int in_tail_collect_pending(struct flb_input_instance *ins, if (file->watch_fd == -1 || (file->offset >= file->size)) { /* Gather current file size */ - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == -1) { flb_errno(); flb_tail_file_remove(file); @@ -343,7 +343,7 @@ int in_tail_collect_event(void *file, struct flb_config *config) struct stat st; struct flb_tail_file *f = file; - ret = fstat(f->fd, &st); + ret = flb_tail_file_stat(f, &st); if (ret == -1) { flb_tail_file_remove(f); return 0; @@ -619,6 +619,13 @@ static struct flb_config_map config_map[] = { FLB_CONFIG_MAP_INT, "progress_check_interval_nsec", "0", 0, FLB_TRUE, offsetof(struct flb_tail_config, progress_check_interval_nsec), }, + { + FLB_CONFIG_MAP_STR, "fstat_interval", "250ms", + 0, FLB_FALSE, 0, + "interval for fstat mode event polling. Controls how often files are checked " + "for changes when using stat-based file watching (instead of inotify). " + "Default is 250ms. Supports time suffixes: s, ms, us, ns." + }, { FLB_CONFIG_MAP_TIME, "rotate_wait", FLB_TAIL_ROTATE_WAIT, 0, FLB_TRUE, offsetof(struct flb_tail_config, rotate_wait), @@ -719,7 +726,14 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines), "Allows to skip empty lines." }, - + { + FLB_CONFIG_MAP_BOOL, "keep_file_handle", "true", + 0, FLB_TRUE, offsetof(struct flb_tail_config, keep_file_handle), + "When set to false, the file handle will be reopened every time we read " + "from the source tailed file and closed when done, to avoid keeping it open. " + "Useful for SMB shares and network filesystems where keeping handles open " + "can cause issues." + }, { FLB_CONFIG_MAP_BOOL, "truncate_long_lines", "false", 0, FLB_TRUE, offsetof(struct flb_tail_config, truncate_long_lines), diff --git a/plugins/in_tail/tail_config.c b/plugins/in_tail/tail_config.c index 2778b7af548..cdcb6b8c577 100644 --- a/plugins/in_tail/tail_config.c +++ b/plugins/in_tail/tail_config.c @@ -106,6 +106,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, ctx->ins = ins; ctx->ignore_older = 0; ctx->skip_long_lines = FLB_FALSE; + ctx->keep_file_handle = FLB_TRUE; /* default: keep file handle open */ + ctx->fstat_interval_nsec = 250000000; /* default: 250ms */ #ifdef FLB_HAVE_SQLDB ctx->db_sync = 1; /* sqlite sync 'normal' */ #endif @@ -189,6 +191,47 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins, } } + /* Config: fstat mode event polling interval */ + tmp = flb_input_get_property("fstat_interval", ins); + if (tmp) { + /* Support suffixes: s, ms, us, ns; also allow plain seconds and fractional seconds */ + char *end = NULL; + double val = strtod(tmp, &end); + uint64_t mult = 1000000000ULL; /* default: seconds */ + + if (end != NULL && *end != '\0') { + if (strcasecmp(end, "s") == 0) mult = 1000000000ULL; + else if (strcasecmp(end, "ms") == 0) mult = 1000000ULL; + else if (strcasecmp(end, "us") == 0) mult = 1000ULL; + else if (strcasecmp(end, "ns") == 0) mult = 1ULL; + else { + flb_plg_error(ctx->ins, "invalid 'fstat_interval' unit in value (%s)", tmp); + flb_tail_config_destroy(ctx); + return NULL; + } + } + + if (val <= 0) { + flb_plg_error(ctx->ins, "invalid 'fstat_interval' value (%s)", tmp); + flb_tail_config_destroy(ctx); + return NULL; + } + + /* Convert to nanoseconds with clamping to reasonable bounds */ + double nsec_d = val * (double) mult; + if (nsec_d < 1.0) { + flb_plg_error(ctx->ins, "'fstat_interval' too small (%s)", tmp); + flb_tail_config_destroy(ctx); + return NULL; + } + ctx->fstat_interval_nsec = (uint64_t) nsec_d; + + if (ctx->fstat_interval_nsec <= 1000000ULL) { + flb_plg_warn(ctx->ins, "very low fstat_interval (%" PRIu64 " ns) may cause high CPU usage", + ctx->fstat_interval_nsec); + } + } + /* Config: seconds interval to monitor file after rotation */ if (ctx->rotate_wait <= 0) { flb_plg_error(ctx->ins, "invalid 'rotate_wait' config value"); diff --git a/plugins/in_tail/tail_config.h b/plugins/in_tail/tail_config.h index 00615448fac..b50fa389073 100644 --- a/plugins/in_tail/tail_config.h +++ b/plugins/in_tail/tail_config.h @@ -107,6 +107,10 @@ struct flb_tail_config { int progress_check_interval; /* watcher interval */ int progress_check_interval_nsec; /* watcher interval */ + uint64_t fstat_interval_nsec; /* fstat mode event polling interval (nanoseconds) */ + + int keep_file_handle; /* keep file handle open during tail (default: true) */ + #ifdef FLB_HAVE_INOTIFY int inotify_watcher; /* enable/disable inotify monitor */ #endif diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 1624bd94c7a..32db1b4f11e 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -58,6 +58,65 @@ static inline void consume_bytes(char *buf, int bytes, int length) memmove(buf, buf + bytes, length - bytes); } +/* Ensure file handle is open (opens if closed). Returns 0 on success, FLB_TAIL_ERROR on failure. */ +int flb_tail_file_ensure_open_handle(struct flb_tail_file *file) +{ + int fd; + + /* If already open, nothing to do */ + if (file->fd != -1) { + return 0; + } + + fd = open(file->name, O_RDONLY); + if (fd == -1) { + flb_errno(); + flb_plg_error(file->config->ins, "cannot open %s", file->name); + return FLB_TAIL_ERROR; + } + + /* Seek to the current offset */ + if (file->offset > 0) { + int64_t ret = lseek(fd, file->offset, SEEK_SET); + if (ret == -1) { + flb_errno(); + close(fd); + return FLB_TAIL_ERROR; + } + } + + file->fd = fd; + return 0; +} + +/* Get file status using stat() if handle is closed, fstat() if open. Returns 0 on success, -1 on error. */ +int flb_tail_file_stat(struct flb_tail_file *file, struct stat *st) +{ + if (file->fd == -1) { + return stat(file->name, st); + } + else { + return fstat(file->fd, st); + } +} + +/* Close file handle unconditionally */ +void flb_tail_file_close_handle(struct flb_tail_file *file) +{ + if (file->fd != -1) { + close(file->fd); + file->fd = -1; + } +} + +/* Close file handle during tail if it's open and keep_file_handle is false */ +void flb_tail_file_close_handle_during_tail(struct flb_tail_file *file) +{ + if (file->config->keep_file_handle == FLB_FALSE && file->fd != -1) { + flb_tail_file_close_handle(file); + } +} + static uint64_t stat_get_st_dev(struct stat *st) { #ifdef FLB_SYSTEM_WINDOWS @@ -1390,6 +1449,12 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, /* Remaining bytes to read */ file->pending_bytes = file->size - file->offset; + /* Close file handle if keep_file_handle is false */ + if (ctx->keep_file_handle == FLB_FALSE) { + flb_plg_debug(ctx->ins, "tail_append: file will be read without keeping file handle opened %s", file->name); + flb_tail_file_close_handle_during_tail(file); + } + #ifdef FLB_HAVE_METRICS name = (char *) flb_input_name(ctx->ins); ts = cfl_time_now(); @@ -1485,9 +1550,8 @@ void flb_tail_file_remove(struct flb_tail_file *file) mk_list_del(&file->_head); flb_tail_fs_remove(ctx, file); - /* avoid deleting file with -1 fd */ if (file->fd != -1) { - close(file->fd); + flb_tail_file_close_handle(file); } if (file->tag_buf) { flb_free(file->tag_buf); @@ -1543,7 +1607,7 @@ static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *fi int64_t offset; struct stat st; - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == -1) { flb_errno(); return FLB_TAIL_ERROR; @@ -1556,15 +1620,22 @@ static int adjust_counters(struct flb_tail_config *ctx, struct flb_tail_file *fi /* Check if the file was truncated by comparing current size with previous size */ if (size_delta < 0) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return FLB_TAIL_ERROR; + /* If keeping handle open, it's already open but at wrong offset - seek to beginning */ + if (ctx->keep_file_handle == FLB_TRUE) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return FLB_TAIL_ERROR; + } + file->offset = offset; + } + else { + /* If not keeping handle open, just update offset - handle will be opened/seeks correctly later */ + file->offset = 0; } flb_plg_debug(ctx->ins, "adjust_counters: inode=%"PRIu64" file truncated %s (diff: %"PRId64" bytes)", file->inode, file->name, size_delta); - file->offset = offset; file->buf_len = 0; /* Update offset in the database file */ @@ -1605,6 +1676,12 @@ int flb_tail_file_chunk(struct flb_tail_file *file) return FLB_TAIL_BUSY; } + /* Open file handle if it's closed */ + ret = flb_tail_file_ensure_open_handle(file); + if (ret != 0) { + return ret; + } + file_buffer_capacity = (file->buf_size - file->buf_len) - 1; stream_data_length = 0; @@ -1640,6 +1717,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) if (ctx->skip_long_lines == FLB_FALSE) { flb_plg_error(ctx->ins, "file=%s requires a larger buffer size, " "lines are too long. Skipping file.", file->name); + flb_tail_file_close_handle_during_tail(file); return FLB_TAIL_ERROR; } @@ -1673,6 +1751,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) flb_errno(); flb_plg_error(ctx->ins, "cannot increase buffer size for %s, " "skipping file.", file->name); + flb_tail_file_close_handle_during_tail(file); return FLB_TAIL_ERROR; } } @@ -1722,7 +1801,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) flb_plg_error(ctx->ins, "decompression buffer resize failed for %s.", file->name); - + flb_tail_file_close_handle_during_tail(file); return FLB_TAIL_ERROR; } @@ -1761,7 +1840,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) flb_plg_error(ctx->ins, "decompression failed for %s.", file->name); - + flb_tail_file_close_handle_during_tail(file); return FLB_TAIL_ERROR; } @@ -1794,6 +1873,7 @@ int flb_tail_file_chunk(struct flb_tail_file *file) if (ret < 0) { flb_plg_debug(ctx->ins, "inode=%"PRIu64" file=%s process content ERROR", file->inode, file->name); + flb_tail_file_close_handle_during_tail(file); return FLB_TAIL_ERROR; } @@ -1813,12 +1893,19 @@ int flb_tail_file_chunk(struct flb_tail_file *file) /* adjust file counters, returns FLB_TAIL_OK or FLB_TAIL_ERROR */ ret = adjust_counters(ctx, file); + /* Close file handle if keep_file_handle is false */ + flb_tail_file_close_handle_during_tail(file); + /* Data was consumed but likely some bytes still remain */ return ret; } else if (raw_data_length == 0) { /* We reached the end of file, let's wait for some incoming data */ ret = adjust_counters(ctx, file); + + /* Close file handle if keep_file_handle is false */ + flb_tail_file_close_handle_during_tail(file); + if (ret == FLB_TAIL_OK) { return FLB_TAIL_WAIT; } @@ -1830,9 +1917,16 @@ int flb_tail_file_chunk(struct flb_tail_file *file) /* error */ flb_errno(); flb_plg_error(ctx->ins, "error reading %s", file->name); + + /* Close file handle if keep_file_handle is false */ + flb_tail_file_close_handle_during_tail(file); + return FLB_TAIL_ERROR; } + /* Close file handle if keep_file_handle is false */ + flb_tail_file_close_handle_during_tail(file); + return FLB_TAIL_ERROR; } @@ -1918,7 +2012,7 @@ int flb_tail_file_to_event(struct flb_tail_file *file) struct flb_tail_config *ctx = file->config; /* Check if the file promoted have pending bytes */ - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret != 0) { flb_errno(); return -1; @@ -1932,10 +2026,12 @@ int flb_tail_file_to_event(struct flb_tail_file *file) file->pending_bytes = 0; } - /* Check if the file has been rotated */ - ret = flb_tail_file_is_rotated(ctx, file); - if (ret == FLB_TRUE) { - flb_tail_file_rotated(file); + /* Check if the file has been rotated (only when keep_file_handle is enabled) */ + if (ctx->keep_file_handle == FLB_TRUE) { + ret = flb_tail_file_is_rotated(ctx, file); + if (ret == FLB_TRUE) { + flb_tail_file_rotated(file); + } } /* Notify the fs-event handler that we will start monitoring this 'file' */ @@ -1959,10 +2055,13 @@ int flb_tail_file_to_event(struct flb_tail_file *file) } /* - * Given an open file descriptor, return the filename. This function is a - * bit slow and it aims to be used only when a file is rotated. + * Internal implementation: Given an open file descriptor, return the filename. + * This function is a bit slow and it aims to be used only when a file is rotated. + * + * This is used to detect the new file path after an open handle has been + * rotated/moved. Requires an open file descriptor. */ -char *flb_tail_file_name(struct flb_tail_file *file) +static char *flb_tail_file_name_internal(struct flb_tail_file *file) { int ret; char *buf; @@ -2055,6 +2154,41 @@ char *flb_tail_file_name(struct flb_tail_file *file) return buf; } +/* + * Public wrapper: Get the file name from a file descriptor. + * This function handles opening/closing the file handle as needed. + * If the handle is closed, opens it temporarily and closes it after getting the name. + * + * Note: When keep_file_handle is false, this function still needs to work for + * resolving symlinks during file initialization, but it should NOT be used for + * log rotation detection (which requires persistent open handles). + */ +char *flb_tail_file_name(struct flb_tail_file *file) +{ + int ret; + int fd_was_opened = FLB_FALSE; + char *result; + + /* If handle is closed, open it temporarily */ + if (file->fd == -1) { + ret = flb_tail_file_ensure_open_handle(file); + if (ret != 0) { + return NULL; + } + fd_was_opened = FLB_TRUE; + } + + /* Call the internal implementation */ + result = flb_tail_file_name_internal(file); + + /* Close handle if we opened it in this function */ + if (fd_was_opened) { + flb_tail_file_close_handle(file); + } + + return result; +} + int flb_tail_file_name_dup(char *path, struct flb_tail_file *file) { file->name = flb_strdup(path); @@ -2156,9 +2290,9 @@ static int check_purge_deleted_file(struct flb_tail_config *ctx, int64_t mtime; struct stat st; - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == -1) { - flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); + flb_plg_debug(ctx->ins, "purge: error stat(2) %s, removing", file->name); flb_tail_file_remove(file); return FLB_TRUE; } @@ -2210,7 +2344,7 @@ int flb_tail_file_purge(struct flb_input_instance *ins, mk_list_foreach_safe(head, tmp, &ctx->files_rotated) { file = mk_list_entry(head, struct flb_tail_file, _rotate_head); if ((file->rotated + ctx->rotate_wait) <= now) { - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == 0) { flb_plg_debug(ctx->ins, "inode=%"PRIu64" purge rotated file %s " \ diff --git a/plugins/in_tail/tail_file.h b/plugins/in_tail/tail_file.h index 46b250394fd..aeaffefff4e 100644 --- a/plugins/in_tail/tail_file.h +++ b/plugins/in_tail/tail_file.h @@ -118,6 +118,10 @@ static inline int flb_tail_target_file_name_cmp(char *name, int flb_tail_file_name_dup(char *path, struct flb_tail_file *file); int flb_tail_file_to_event(struct flb_tail_file *file); int flb_tail_file_chunk(struct flb_tail_file *file); +int flb_tail_file_ensure_open_handle(struct flb_tail_file *file); +int flb_tail_file_stat(struct flb_tail_file *file, struct stat *st); +void flb_tail_file_close_handle(struct flb_tail_file *file); +void flb_tail_file_close_handle_during_tail(struct flb_tail_file *file); int flb_tail_file_append(char *path, struct stat *st, int mode, ssize_t offset, struct flb_tail_config *ctx); diff --git a/plugins/in_tail/tail_fs_inotify.c b/plugins/in_tail/tail_fs_inotify.c index eee9babd343..18852fac9b2 100644 --- a/plugins/in_tail/tail_fs_inotify.c +++ b/plugins/in_tail/tail_fs_inotify.c @@ -215,7 +215,7 @@ static int tail_fs_event(struct flb_input_instance *ins, flb_tail_fs_add_rotated(file); } - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == -1) { flb_plg_debug(ins, "inode=%"PRIu64" error stat(2) %s, removing", file->inode, file->name); @@ -257,15 +257,22 @@ static int tail_fs_event(struct flb_input_instance *ins, */ if (size_delta < 0) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return -1; + /* If keeping handle open, it's already open but at wrong offset - seek to beginning */ + if (ctx->keep_file_handle == FLB_TRUE) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return -1; + } + file->offset = offset; + } + else { + /* If not keeping handle open, just update offset - handle will be opened/seeks correctly later */ + file->offset = 0; } flb_plg_debug(ctx->ins, "tail_fs_event: inode=%"PRIu64" file truncated %s (diff: %"PRId64" bytes)", file->inode, file->name, size_delta); - file->offset = offset; file->buf_len = 0; /* Update offset in the database file */ @@ -327,7 +334,7 @@ static int in_tail_progress_check_callback(struct flb_input_instance *ins, continue; } - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == -1) { flb_errno(); flb_plg_error(ins, "fstat error"); diff --git a/plugins/in_tail/tail_fs_stat.c b/plugins/in_tail/tail_fs_stat.c index 36eead51464..2018b92ad99 100644 --- a/plugins/in_tail/tail_fs_stat.c +++ b/plugins/in_tail/tail_fs_stat.c @@ -62,7 +62,7 @@ static int tail_fs_event(struct flb_input_instance *ins, fst = file->fs_backend; /* Check current status of the file */ - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == -1) { flb_errno(); continue; @@ -99,9 +99,9 @@ static int tail_fs_check(struct flb_input_instance *ins, file = mk_list_entry(head, struct flb_tail_file, _head); fst = file->fs_backend; - ret = fstat(file->fd, &st); + ret = flb_tail_file_stat(file, &st); if (ret == -1) { - flb_plg_debug(ctx->ins, "error stat(2) %s, removing", file->name); + flb_plg_debug(ctx->ins, "check: error stat(2) %s, removing", file->name); flb_tail_file_remove(file); continue; } @@ -126,15 +126,22 @@ static int tail_fs_check(struct flb_input_instance *ins, /* Check if the file was truncated */ if (size_delta < 0) { - offset = lseek(file->fd, 0, SEEK_SET); - if (offset == -1) { - flb_errno(); - return -1; + /* If keeping handle open, it's already open but at wrong offset - seek to beginning */ + if (ctx->keep_file_handle == FLB_TRUE) { + offset = lseek(file->fd, 0, SEEK_SET); + if (offset == -1) { + flb_errno(); + return -1; + } + file->offset = offset; + } + else { + /* If not keeping handle open, just update offset - handle will be opened/seeks correctly later */ + file->offset = 0; } flb_plg_debug(ctx->ins, "tail_fs_check: file truncated %s (diff: %"PRId64" bytes)", file->name, size_delta); - file->offset = offset; file->buf_len = 0; memcpy(&fst->st, &st, sizeof(struct stat)); @@ -154,6 +161,17 @@ static int tail_fs_check(struct flb_input_instance *ins, file->pending_bytes = 0; } + /* + * Skip rotation detection when keep_file_handle is false. + * Rotation detection requires persistent open handles to work reliably. + * Without keeping handles open, calling flb_tail_file_name() would + * unnecessarily open and close handles multiple times per check cycle. + * Because we are not keeping a handle, a rotation would be interpreted + * as a truncation and handled by the truncation management logic. + */ + if (ctx->keep_file_handle == FLB_FALSE) { + continue; + } /* Discover the current file name for the open file descriptor */ name = flb_tail_file_name(file); @@ -176,7 +194,6 @@ static int tail_fs_check(struct flb_input_instance *ins, flb_tail_file_rotated(file); } flb_free(name); - } return 0; @@ -190,9 +207,12 @@ int flb_tail_fs_stat_init(struct flb_input_instance *in, flb_plg_debug(ctx->ins, "flb_tail_fs_stat_init() initializing stat tail input"); - /* Set a manual timer to collect events every 0.250 seconds */ + /* Set a manual timer to collect events using configured interval */ + /* Convert nanoseconds to seconds and nanoseconds for the API */ ret = flb_input_set_collector_time(in, tail_fs_event, - 0, 250000000, config); + (int)(ctx->fstat_interval_nsec / 1000000000L), + (long)(ctx->fstat_interval_nsec % 1000000000L), + config); if (ret < 0) { return -1; } diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index 2b1ebb5228a..a1273eca8c7 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -1385,6 +1385,262 @@ void flb_test_in_tail_multiline_json_and_regex() } } +void flb_test_keep_file_handle_false() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"keep_file_handle_test.log"}; + char *msg1 = "first message"; + char *msg2 = "second message"; + char *msg3 = "third message"; + int ret; + int num; + int unused; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char*), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "path", file[0], + "keep_file_handle", "false", + "read_from_head", "true", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Write first message */ + ret = write_msg(ctx, msg1, strlen(msg1)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Write second message */ + ret = write_msg(ctx, msg2, strlen(msg2)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Write third message */ + ret = write_msg(ctx, msg3, strlen(msg3)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 3)) { + TEST_MSG("output num error. expect=3 got=%d (keep_file_handle=false should read all messages)", num); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_keep_file_handle_true() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"keep_file_handle_true_test.log"}; + char *msg1 = "first message"; + char *msg2 = "second message"; + char *msg3 = "third message"; + int ret; + int num; + int unused; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char*), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "path", file[0], + "keep_file_handle", "true", + "read_from_head", "true", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Write first message */ + ret = write_msg(ctx, msg1, strlen(msg1)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Write second message */ + ret = write_msg(ctx, msg2, strlen(msg2)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Write third message */ + ret = write_msg(ctx, msg3, strlen(msg3)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + num = get_output_num(); + if (!TEST_CHECK(num == 3)) { + TEST_MSG("output num error. expect=3 got=%d (keep_file_handle=true should read all messages)", num); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_keep_file_handle_false_truncation() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"keep_file_handle_truncation_test.log"}; + char *msg1 = "first message before truncation"; + char *msg2 = "second message before truncation"; + char *msg3 = "first message after truncation"; + char *msg4 = "second message after truncation"; + int ret; + int num; + int unused; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char*), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "path", file[0], + "keep_file_handle", "false", + "read_from_head", "true", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + /* Write first message before truncation */ + ret = write_msg(ctx, msg1, strlen(msg1)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Write second message before truncation */ + ret = write_msg(ctx, msg2, strlen(msg2)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Verify we got the first 2 messages */ + num = get_output_num(); + if (!TEST_CHECK(num == 2)) { + TEST_MSG("output num error before truncation. expect=2 got=%d", num); + } + + /* Truncate the file to 0 bytes */ + ret = ftruncate(ctx->fds[0], 0); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ftruncate failed"); + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + fsync(ctx->fds[0]); + flb_time_msleep(500); + + /* Write first message after truncation */ + ret = write_msg(ctx, msg3, strlen(msg3)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Write second message after truncation */ + ret = write_msg(ctx, msg4, strlen(msg4)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + exit(EXIT_FAILURE); + } + + /* waiting to flush */ + flb_time_msleep(500); + + /* Verify we got all 4 messages (2 before + 2 after truncation) */ + num = get_output_num(); + if (!TEST_CHECK(num == 4)) { + TEST_MSG("output num error after truncation. expect=4 got=%d (keep_file_handle=false should detect truncation and read from beginning)", num); + } + + test_tail_ctx_destroy(ctx); +} + void flb_test_path_comma() { struct flb_lib_out_cb cb_data; @@ -1407,7 +1663,7 @@ void flb_test_path_comma() exit(EXIT_FAILURE); } - ret = flb_input_set(ctx->flb, ctx->o_ffd, + ret = flb_input_set(ctx->flb, ctx->i_ffd, "path", path, NULL); TEST_CHECK(ret == 0); @@ -2658,6 +2914,9 @@ TEST_LIST = { {"skip_long_lines", flb_test_in_tail_skip_long_lines}, {"truncate_long_lines", flb_test_in_tail_truncate_long_lines}, {"truncate_long_lines_utf8", flb_test_in_tail_truncate_long_lines_utf8}, + {"keep_file_handle_false", flb_test_keep_file_handle_false}, + {"keep_file_handle_true", flb_test_keep_file_handle_true}, + {"keep_file_handle_false_truncation", flb_test_keep_file_handle_false_truncation}, {"path_comma", flb_test_path_comma}, {"path_key", flb_test_path_key}, {"exclude_path", flb_test_exclude_path},