viewer-stream.h viewer-stream.c \
session.c session.h \
stream.c stream.h \
- stream-fd.c stream-fd.h \
connection.c connection.h \
viewer-session.c viewer-session.h \
tracefile-array.c tracefile-array.h \
$(top_builddir)/src/common/health/libhealth.la \
$(top_builddir)/src/common/config/libconfig.la \
$(top_builddir)/src/common/testpoint/libtestpoint.la \
- $(top_builddir)/src/common/fd-tracker/libfd-tracker.la \
$(top_builddir)/src/lib/lttng-ctl/liblttng-ctl.la
{
int ret = 1;
bool flushed = false;
- int fd;
pthread_mutex_lock(&index->lock);
if (index->flushed) {
if (!index->has_index_data || !index->index_file) {
goto skip;
}
- fd = index->index_file->fd;
- DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64
- " on fd %d", index->stream->stream_handle,
- index->index_n.key, fd);
+
+ DBG2("Writing index for stream ID %" PRIu64 " and seq num %" PRIu64,
+ index->stream->stream_handle, index->index_n.key);
flushed = true;
index->flushed = true;
ret = lttng_index_file_write(index->index_file, &index->index_data);
rcu_read_lock();
cds_lfht_for_each_entry(stream->indexes_ht->ht, &iter.iter,
index, index_n.node) {
- DBG("Update index to fd %d", stream->index_file->fd);
ret = relay_index_switch_file(index, stream->index_file,
stream->pos_after_last_complete_data_index);
if (ret) {
#include <common/hashtable/hashtable.h>
#include <common/index/index.h>
-#include "stream-fd.h"
-
struct relay_stream;
struct relay_connection;
struct lttcomm_relayd_index;
*/
#define _LGPL_SOURCE
+#include <fcntl.h>
#include <getopt.h>
#include <grp.h>
+#include <inttypes.h>
#include <limits.h>
#include <pthread.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
-#include <inttypes.h>
+#include <unistd.h>
#include <urcu/futex.h>
-#include <urcu/uatomic.h>
#include <urcu/rculist.h>
-#include <unistd.h>
-#include <fcntl.h>
+#include <urcu/uatomic.h>
-#include <lttng/lttng.h>
#include <common/common.h>
+#include <common/compat/endian.h>
#include <common/compat/poll.h>
#include <common/compat/socket.h>
-#include <common/compat/endian.h>
#include <common/defaults.h>
+#include <common/fd-tracker/utils.h>
+#include <common/fs-handle.h>
#include <common/futex.h>
#include <common/index/index.h>
-#include <common/sessiond-comm/sessiond-comm.h>
#include <common/sessiond-comm/inet.h>
#include <common/sessiond-comm/relayd.h>
+#include <common/sessiond-comm/sessiond-comm.h>
#include <common/uri.h>
#include <common/utils.h>
-#include <common/fd-tracker/utils.h>
+#include <lttng/lttng.h>
#include "cmd.h"
+#include "connection.h"
+#include "ctf-trace.h"
+#include "health-relayd.h"
#include "live.h"
#include "lttng-relayd.h"
-#include "utils.h"
-#include "health-relayd.h"
-#include "testpoint.h"
-#include "viewer-stream.h"
-#include "stream.h"
#include "session.h"
-#include "ctf-trace.h"
-#include "connection.h"
+#include "stream.h"
+#include "testpoint.h"
+#include "utils.h"
#include "viewer-session.h"
+#include "viewer-stream.h"
#define SESSION_BUF_DEFAULT_COUNT 16
* overwrite caused by tracefile rotation (in association with
* unlink performed before overwrite).
*/
- if (!vstream->stream_file.fd) {
- int fd;
+ if (!vstream->stream_file.handle) {
char file_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
+ struct fs_handle *fs_handle;
ret = utils_stream_file_path(rstream->path_name,
rstream->channel_name, rstream->tracefile_size,
* missing if the stream has been closed (application exits with
* per-pid buffers) and a clear command has been performed.
*/
- status = lttng_trace_chunk_open_file(
+ status = lttng_trace_chunk_open_fs_handle(
vstream->stream_file.trace_chunk,
- file_path, O_RDONLY, 0, &fd, true);
+ file_path, O_RDONLY, 0, &fs_handle, true);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
rstream->closed) {
PERROR("Failed to open trace file for viewer stream");
goto error_put;
}
- vstream->stream_file.fd = stream_fd_create(fd);
- if (!vstream->stream_file.fd) {
- if (close(fd)) {
- PERROR("Failed to close viewer stream file");
- }
- goto error_put;
- }
+ vstream->stream_file.handle = fs_handle;
}
ret = check_new_streams(conn);
ret = lttng_index_file_read(vstream->index_file, &packet_index);
if (ret) {
- ERR("Relay error reading index file %d",
- vstream->index_file->fd);
+ ERR("Relay error reading index file");
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
} else {
uint32_t reply_size = sizeof(reply_header);
uint32_t packet_data_len = 0;
ssize_t read_len;
+ uint64_t stream_id;
DBG2("Relay get data packet");
/* From this point on, the error label can be reached. */
memset(&reply_header, 0, sizeof(reply_header));
+ stream_id = (uint64_t) be64toh(get_packet_info.stream_id);
- vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
+ vstream = viewer_stream_get_by_id(stream_id);
if (!vstream) {
DBG("Client requested packet of unknown stream id %" PRIu64,
- (uint64_t) be64toh(get_packet_info.stream_id));
+ stream_id);
reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
goto send_reply_nolock;
} else {
}
pthread_mutex_lock(&vstream->stream->lock);
- lseek_ret = lseek(vstream->stream_file.fd->fd,
+ lseek_ret = fs_handle_seek(vstream->stream_file.handle,
be64toh(get_packet_info.offset), SEEK_SET);
if (lseek_ret < 0) {
- PERROR("lseek fd %d to offset %" PRIu64,
- vstream->stream_file.fd->fd,
+ PERROR("Failed to seek file system handle of viewer stream %" PRIu64
+ " to offset %" PRIu64,
+ stream_id,
(uint64_t) be64toh(get_packet_info.offset));
goto error;
}
- read_len = lttng_read(vstream->stream_file.fd->fd,
+ read_len = fs_handle_read(vstream->stream_file.handle,
reply + sizeof(reply_header), packet_data_len);
if (read_len < packet_data_len) {
- PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
- vstream->stream_file.fd->fd,
+ PERROR("Failed to read from file system handle of viewer stream id %" PRIu64
+ ", offset: %" PRIu64,
+ stream_id,
(uint64_t) be64toh(get_packet_info.offset));
goto error;
}
goto end_free;
}
- DBG("Sent %u bytes for stream %" PRIu64, reply_size,
- (uint64_t) be64toh(get_packet_info.stream_id));
+ DBG("Sent %u bytes for stream %" PRIu64, reply_size, stream_id);
end_free:
free(reply);
int viewer_get_metadata(struct relay_connection *conn)
{
int ret = 0;
+ int fd = -1;
ssize_t read_len;
uint64_t len = 0;
char *data = NULL;
len = vstream->stream->metadata_received - vstream->metadata_sent;
/* first time, we open the metadata file */
- if (!vstream->stream_file.fd) {
- int fd;
+ if (!vstream->stream_file.handle) {
+ struct fs_handle *fs_handle;
char file_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
struct relay_stream *rstream = vstream->stream;
* missing if the stream has been closed (application exits with
* per-pid buffers) and a clear command has been performed.
*/
- status = lttng_trace_chunk_open_file(
+ status = lttng_trace_chunk_open_fs_handle(
vstream->stream_file.trace_chunk,
- file_path, O_RDONLY, 0, &fd, true);
+ file_path, O_RDONLY, 0, &fs_handle, true);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
PERROR("Failed to open metadata file for viewer stream");
goto error;
}
- vstream->stream_file.fd = stream_fd_create(fd);
- if (!vstream->stream_file.fd) {
- if (close(fd)) {
- PERROR("Failed to close viewer metadata file");
- }
- goto error;
- }
+ vstream->stream_file.handle = fs_handle;
}
reply.len = htobe64(len);
goto error;
}
- read_len = lttng_read(vstream->stream_file.fd->fd, data, len);
+ fd = fs_handle_get_fd(vstream->stream_file.handle);
+ if (fd < 0) {
+ ERR("Failed to restore viewer stream file system handle");
+ goto error;
+ }
+ read_len = lttng_read(fd, data, len);
+ fs_handle_put_fd(vstream->stream_file.handle);
+ fd = -1;
if (read_len < len) {
PERROR("Relay reading metadata file");
goto error;
+++ /dev/null
-/*
- * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#define _LGPL_SOURCE
-#include <common/common.h>
-
-#include "stream-fd.h"
-
-struct stream_fd *stream_fd_create(int fd)
-{
- struct stream_fd *sf;
-
- sf = zmalloc(sizeof(*sf));
- if (!sf) {
- goto end;
- }
- urcu_ref_init(&sf->ref);
- sf->fd = fd;
-end:
- return sf;
-}
-
-void stream_fd_get(struct stream_fd *sf)
-{
- urcu_ref_get(&sf->ref);
-}
-
-static void stream_fd_release(struct urcu_ref *ref)
-{
- struct stream_fd *sf = caa_container_of(ref, struct stream_fd, ref);
- int ret;
-
- ret = close(sf->fd);
- if (ret) {
- PERROR("Error closing stream FD %d", sf->fd);
- }
- free(sf);
-}
-
-void stream_fd_put(struct stream_fd *sf)
-{
- if (!sf) {
- return;
- }
- urcu_ref_put(&sf->ref, stream_fd_release);
-}
+++ /dev/null
-#ifndef _STREAM_FD_H
-#define _STREAM_FD_H
-
-/*
- * Copyright (C) 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#include <urcu/ref.h>
-
-struct stream_fd {
- int fd;
- struct urcu_ref ref;
-};
-
-struct stream_fd *stream_fd_create(int fd);
-void stream_fd_get(struct stream_fd *sf);
-void stream_fd_put(struct stream_fd *sf);
-
-#endif /* _STREAM_FD_H */
#define _LGPL_SOURCE
#include <common/common.h>
-#include <common/utils.h>
#include <common/defaults.h>
+#include <common/fs-handle.h>
#include <common/sessiond-comm/relayd.h>
-#include <urcu/rculist.h>
+#include <common/utils.h>
#include <sys/stat.h>
+#include <urcu/rculist.h>
#include "lttng-relayd.h"
#include "index.h"
struct relay_stream *stream,
struct lttng_trace_chunk *trace_chunk,
bool force_unlink,
- struct stream_fd **out_stream_fd)
+ struct fs_handle **out_file)
{
- int ret, fd;
+ int ret;
char stream_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
const int flags = O_RDWR | O_CREAT | O_TRUNC;
}
}
- status = lttng_trace_chunk_open_file(
- trace_chunk, stream_path, flags, mode, &fd, false);
+ status = lttng_trace_chunk_open_fs_handle(trace_chunk, stream_path,
+ flags, mode, out_file, false);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->channel_name);
ret = -1;
goto end;
}
-
- *out_stream_fd = stream_fd_create(fd);
- if (!*out_stream_fd) {
- if (close(ret)) {
- PERROR("Error closing stream file descriptor %d", ret);
- }
- ret = -1;
- goto end;
- }
end:
return ret;
}
DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
stream->stream_handle, stream->tracefile_size_current);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream->tracefile_wrapped_around = false;
stream->tracefile_current_index = 0;
if (stream->ongoing_rotation.value.next_trace_chunk) {
- struct stream_fd *new_stream_fd = NULL;
enum lttng_trace_chunk_status chunk_status;
chunk_status = lttng_trace_chunk_create_subdirectory(
/* Rotate the data file. */
ret = stream_create_data_output_file_from_trace_chunk(stream,
stream->ongoing_rotation.value.next_trace_chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
if (ret < 0) {
ERR("Failed to rotate stream data file");
goto end;
off_t lseek_ret, previous_stream_copy_origin;
uint64_t copy_bytes_left, misplaced_data_size;
bool acquired_reference;
- struct stream_fd *previous_stream_fd = NULL;
+ struct fs_handle *previous_stream_file = NULL;
struct lttng_trace_chunk *previous_chunk = NULL;
if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
* the orinal stream_fd will be used to copy the "extra" data
* to the new file.
*/
- assert(stream->stream_fd);
- previous_stream_fd = stream->stream_fd;
- stream->stream_fd = NULL;
+ assert(stream->file);
+ previous_stream_file = stream->file;
+ stream->file = NULL;
assert(!stream->is_metadata);
assert(stream->tracefile_size_current >
goto end;
}
- assert(stream->stream_fd);
+ assert(stream->file);
/*
* Seek the current tracefile to the position at which the rotation
* should have occurred.
*/
- lseek_ret = lseek(previous_stream_fd->fd, previous_stream_copy_origin,
- SEEK_SET);
+ lseek_ret = fs_handle_seek(previous_stream_file, previous_stream_copy_origin, SEEK_SET);
if (lseek_ret < 0) {
PERROR("Failed to seek to offset %" PRIu64
" while copying extra data received before a stream rotation",
const off_t copy_size_this_pass = min_t(
off_t, copy_bytes_left, sizeof(copy_buffer));
- io_ret = lttng_read(previous_stream_fd->fd, copy_buffer,
+ io_ret = fs_handle_read(previous_stream_file, copy_buffer,
copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to read %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- previous_stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
} else {
ERR("Failed to read %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- previous_stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
}
ret = -1;
goto end;
}
- io_ret = lttng_write(stream->stream_fd->fd, copy_buffer,
- copy_size_this_pass);
+ io_ret = fs_handle_write(
+ stream->file, copy_buffer, copy_size_this_pass);
if (io_ret < (ssize_t) copy_size_this_pass) {
if (io_ret == -1) {
PERROR("Failed to write %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- stream->stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
} else {
ERR("Failed to write %" PRIu64
- " bytes from fd %i in %s(), returned %zi",
+ " bytes from previous stream file in %s(), returned %zi: stream id = %" PRIu64,
copy_size_this_pass,
- stream->stream_fd->fd,
- __FUNCTION__, io_ret);
+ __FUNCTION__, io_ret,
+ stream->stream_handle);
}
ret = -1;
goto end;
}
/* Truncate the file to get rid of the excess data. */
- ret = ftruncate(previous_stream_fd->fd, previous_stream_copy_origin);
+ ret = fs_handle_truncate(
+ previous_stream_file, previous_stream_copy_origin);
if (ret) {
PERROR("Failed to truncate current stream file to offset %" PRIu64,
previous_stream_copy_origin);
ret = 0;
end:
lttng_trace_chunk_put(previous_chunk);
- stream_fd_put(previous_stream_fd);
return ret;
}
int ret = 0;
enum lttng_trace_chunk_status status;
bool acquired_reference;
- struct stream_fd *new_stream_fd = NULL;
status = lttng_trace_chunk_create_subdirectory(chunk,
stream->path_name);
assert(acquired_reference);
stream->trace_chunk = chunk;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream, chunk,
- false, &new_stream_fd);
- stream->stream_fd = new_stream_fd;
+ false, &stream->file);
end:
return ret;
}
end:
if (ret) {
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
stream_put(stream);
stream = NULL;
stream_unpublish(stream);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
*/
/* Put stream fd before put chunk. */
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
ASSERT_LOCKED(stream->lock);
- if (!stream->stream_fd || !stream->trace_chunk) {
+ if (!stream->file || !stream->trace_chunk) {
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
stream->tracefile_current_index = new_file_index;
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ fs_handle_close(stream->file);
+ stream->file = NULL;
}
ret = stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, false, &stream->stream_fd);
+ stream->trace_chunk, false, &stream->file);
if (ret) {
ERR("Failed to perform trace file rotation of stream %" PRIu64,
stream->stream_handle);
memset(padding_buffer, 0,
min(sizeof(padding_buffer), padding_to_write));
- if (!stream->stream_fd || !stream->trace_chunk) {
+ if (!stream->file || !stream->trace_chunk) {
ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
stream->stream_handle, stream->channel_name);
ret = -1;
goto end;
}
if (packet) {
- write_ret = lttng_write(stream->stream_fd->fd,
- packet->data, packet->size);
+ write_ret = fs_handle_write(
+ stream->file, packet->data, packet->size);
if (write_ret != packet->size) {
PERROR("Failed to write to stream file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
const size_t padding_to_write_this_pass =
min(padding_to_write, sizeof(padding_buffer));
- write_ret = lttng_write(stream->stream_fd->fd,
- padding_buffer, padding_to_write_this_pass);
+ write_ret = fs_handle_write(stream->file, padding_buffer,
+ padding_to_write_this_pass);
if (write_ret != padding_to_write_this_pass) {
PERROR("Failed to write padding to file of %sstream %" PRIu64,
stream->is_metadata ? "metadata " : "",
{
ASSERT_LOCKED(stream->lock);
- if (stream->stream_fd) {
- stream_fd_put(stream->stream_fd);
- stream->stream_fd = NULL;
+ if (stream->file) {
+ int ret;
+
+ ret = fs_handle_close(stream->file);
+ if (ret) {
+ ERR("Failed to close stream file handle: channel name = \"%s\", id = %" PRIu64,
+ stream->channel_name,
+ stream->stream_handle);
+ }
+ stream->file = NULL;
}
DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
stream->pos_after_last_complete_data_index = 0;
return stream_create_data_output_file_from_trace_chunk(stream,
- stream->trace_chunk, true, &stream->stream_fd);
+ stream->trace_chunk, true, &stream->file);
}
void print_relay_streams(void)
#include <common/buffer-view.h>
#include "session.h"
-#include "stream-fd.h"
#include "tracefile-array.h"
struct lttcomm_relayd_index;
/* seq num to encounter before closing. */
uint64_t last_net_seq_num;
- /* FD on which to write the stream data. */
- struct stream_fd *stream_fd;
+ struct fs_handle *file;
/* index file on which to write the index data. */
struct lttng_index_file *index_file;
* If we never received a data file for the current stream, delay the
* opening, otherwise open it right now.
*/
- if (stream->stream_fd) {
- int fd, ret;
+ if (stream->file) {
+ int ret;
char file_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
goto error_unlock;
}
- status = lttng_trace_chunk_open_file(
- vstream->stream_file.trace_chunk,
- file_path, O_RDONLY, 0, &fd, true);
+ status = lttng_trace_chunk_open_fs_handle(
+ vstream->stream_file.trace_chunk, file_path,
+ O_RDONLY, 0, &vstream->stream_file.handle,
+ true);
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
goto error_unlock;
}
- vstream->stream_file.fd = stream_fd_create(fd);
- if (!vstream->stream_file.fd) {
- if (close(fd)) {
- PERROR("Failed to close viewer %sfile",
- stream->is_metadata ? "metadata " : "");
- }
- goto error_unlock;
- }
}
if (seek_t == LTTNG_VIEWER_SEEK_LAST && vstream->index_file) {
off_t lseek_ret;
- lseek_ret = lseek(vstream->index_file->fd, 0, SEEK_END);
+ lseek_ret = fs_handle_seek(
+ vstream->index_file->file, 0, SEEK_END);
if (lseek_ret < 0) {
goto error_unlock;
}
viewer_stream_unpublish(vstream);
- if (vstream->stream_file.fd) {
- stream_fd_put(vstream->stream_file.fd);
- vstream->stream_file.fd = NULL;
+ if (vstream->stream_file.handle) {
+ fs_handle_close(vstream->stream_file.handle);
+ vstream->stream_file.handle = NULL;
}
if (vstream->index_file) {
lttng_index_file_put(vstream->index_file);
lttng_index_file_put(vstream->index_file);
vstream->index_file = NULL;
}
- if (vstream->stream_file.fd) {
- stream_fd_put(vstream->stream_file.fd);
- vstream->stream_file.fd = NULL;
+ if (vstream->stream_file.handle) {
+ fs_handle_close(vstream->stream_file.handle);
+ vstream->stream_file.handle = NULL;
}
}
struct relay_stream *stream;
struct {
- /* FD from which to read the stream data. */
- struct stream_fd *fd;
+ struct fs_handle *handle;
struct lttng_trace_chunk *trace_chunk;
} stream_file;
/* index file from which to read the index data. */
libcommon_la_LIBADD = \
$(top_builddir)/src/common/config/libconfig.la \
$(top_builddir)/src/common/compat/libcompat.la \
- $(top_builddir)/src/common/hashtable/libhashtable.la
+ $(top_builddir)/src/common/hashtable/libhashtable.la \
+ $(top_builddir)/src/common/fd-tracker/libfd-tracker.la
if BUILD_LIB_COMPAT
SUBDIRS += compat
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
-#include <common/fs-handle.h>
#include <common/fs-handle-internal.h>
+#include <common/fs-handle.h>
+#include <common/readwrite.h>
LTTNG_HIDDEN
int fs_handle_get_fd(struct fs_handle *handle)
{
return handle->close(handle);
}
+
+LTTNG_HIDDEN
+ssize_t fs_handle_read(struct fs_handle *handle, void *buf, size_t count)
+{
+ ssize_t ret;
+ const int fd = fs_handle_get_fd(handle);
+
+ if (fd < 0) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = lttng_read(fd, buf, count);
+ fs_handle_put_fd(handle);
+end:
+ return ret;
+}
+
+LTTNG_HIDDEN
+ssize_t fs_handle_write(struct fs_handle *handle, const void *buf, size_t count)
+{
+ ssize_t ret;
+ const int fd = fs_handle_get_fd(handle);
+
+ if (fd < 0) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = lttng_write(fd, buf, count);
+ fs_handle_put_fd(handle);
+end:
+ return ret;
+}
+
+LTTNG_HIDDEN
+int fs_handle_truncate(struct fs_handle *handle, off_t offset)
+{
+ int ret;
+ const int fd = fs_handle_get_fd(handle);
+
+ if (fd < 0) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = ftruncate(fd, offset);
+ fs_handle_put_fd(handle);
+end:
+ return ret;
+}
+
+LTTNG_HIDDEN
+int fs_handle_seek(struct fs_handle *handle, off_t offset, int whence)
+{
+ int ret;
+ const int fd = fs_handle_get_fd(handle);
+
+ if (fd < 0) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = lseek(fd, offset, whence);
+ fs_handle_put_fd(handle);
+end:
+ return ret;
+}
#define FS_HANDLE_H
#include <common/macros.h>
+#include <stdio.h>
struct fs_handle;
LTTNG_HIDDEN
int fs_handle_close(struct fs_handle *handle);
+LTTNG_HIDDEN
+ssize_t fs_handle_read(struct fs_handle *handle, void *buf, size_t count);
+
+LTTNG_HIDDEN
+ssize_t fs_handle_write(struct fs_handle *handle, const void *buf, size_t count);
+
+LTTNG_HIDDEN
+int fs_handle_truncate(struct fs_handle *handle, off_t offset);
+
+LTTNG_HIDDEN
+int fs_handle_seek(struct fs_handle *handle, off_t offset, int whence);
+
#endif /* FS_HANDLE_H */
{
struct lttng_index_file *index_file;
enum lttng_trace_chunk_status chunk_status;
- int ret, fd = -1;
+ int ret;
+ struct fs_handle *fs_handle = NULL;
ssize_t size_ret;
struct ctf_packet_index_file_hdr hdr;
char index_directory_path[LTTNG_PATH_MAX];
}
}
- chunk_status = lttng_trace_chunk_open_file(chunk, index_file_path,
- flags, mode, &fd, expect_no_file);
+ chunk_status = lttng_trace_chunk_open_fs_handle(chunk, index_file_path,
+ flags, mode, &fs_handle, expect_no_file);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
goto error;
}
if (flags == WRITE_FILE_FLAGS) {
ctf_packet_index_file_hdr_init(&hdr, index_major, index_minor);
- size_ret = lttng_write(fd, &hdr, sizeof(hdr));
+ size_ret = fs_handle_write(fs_handle, &hdr, sizeof(hdr));
if (size_ret < sizeof(hdr)) {
PERROR("Failed to write index header");
chunk_status = LTTNG_TRACE_CHUNK_STATUS_ERROR;
} else {
uint32_t element_len;
- size_ret = lttng_read(fd, &hdr, sizeof(hdr));
+ size_ret = fs_handle_read(fs_handle, &hdr, sizeof(hdr));
if (size_ret < 0) {
PERROR("Failed to read index header");
chunk_status = LTTNG_TRACE_CHUNK_STATUS_ERROR;
}
index_file->element_len = element_len;
}
- index_file->fd = fd;
+ index_file->file = fs_handle;
index_file->major = index_major;
index_file->minor = index_minor;
urcu_ref_init(&index_file->ref);
return LTTNG_TRACE_CHUNK_STATUS_OK;
error:
- if (fd >= 0) {
- ret = close(fd);
+ if (fs_handle) {
+ ret = fs_handle_close(fs_handle);
if (ret < 0) {
PERROR("Failed to close file descriptor of index file");
}
int lttng_index_file_write(const struct lttng_index_file *index_file,
const struct ctf_packet_index *element)
{
- int fd;
- size_t len;
ssize_t ret;
+ const size_t len = index_file->element_len;;
assert(index_file);
assert(element);
- fd = index_file->fd;
- len = index_file->element_len;
-
- if (fd < 0) {
+ if (!index_file->file) {
goto error;
}
- ret = lttng_write(fd, element, len);
+ ret = fs_handle_write(index_file->file, element, len);
if (ret < len) {
PERROR("writing index file");
goto error;
struct ctf_packet_index *element)
{
ssize_t ret;
- int fd = index_file->fd;
- size_t len = index_file->element_len;
+ const size_t len = index_file->element_len;
assert(element);
- if (fd < 0) {
+ if (!index_file->file) {
goto error;
}
- ret = lttng_read(fd, element, len);
+ ret = fs_handle_read(index_file->file, element, len);
if (ret < 0) {
PERROR("read index file");
goto error;
struct lttng_index_file *index_file = caa_container_of(ref,
struct lttng_index_file, ref);
- if (close(index_file->fd)) {
+ if (fs_handle_close(index_file->file)) {
PERROR("close index fd");
}
lttng_trace_chunk_put(index_file->trace_chunk);
#include <inttypes.h>
#include <urcu/ref.h>
-#include <common/trace-chunk.h>
#include "ctf-index.h"
+#include <common/fs-handle.h>
+#include <common/trace-chunk.h>
struct lttng_index_file {
- int fd;
+ struct fs_handle *file;
uint32_t major;
uint32_t minor;
uint32_t element_len;
#include <common/dynamic-array.h>
#include <common/error.h>
#include <common/fd-tracker/fd-tracker.h>
+#include <common/fs-handle-internal.h>
#include <common/hashtable/hashtable.h>
#include <common/hashtable/utils.h>
#include <common/optional.h>
struct cds_lfht *ht;
};
+struct fs_handle_untracked {
+ struct fs_handle parent;
+ int fd;
+ struct {
+ struct lttng_directory_handle *directory_handle;
+ char *path;
+ } location;
+};
+
+static
+int fs_handle_untracked_get_fd(struct fs_handle *handle);
+static
+void fs_handle_untracked_put_fd(struct fs_handle *handle);
+static
+int fs_handle_untracked_unlink(struct fs_handle *handle);
+static
+int fs_handle_untracked_close(struct fs_handle *handle);
+
static const
char *close_command_names[] = {
[LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED] =
lttng_trace_chunk_delete_post_release,
};
+static
+struct fs_handle *fs_handle_untracked_create(
+ struct lttng_directory_handle *directory_handle,
+ const char *path,
+ int fd)
+{
+ struct fs_handle_untracked *handle = NULL;
+ bool reference_acquired;
+ char *path_copy = strdup(path);
+
+ assert(fd >= 0);
+ if (!path_copy) {
+ PERROR("Failed to copy file path while creating untracked filesystem handle");
+ goto end;
+ }
+
+ handle = zmalloc(sizeof(typeof(*handle)));
+ if (!handle) {
+ PERROR("Failed to allocate untracked filesystem handle");
+ goto end;
+ }
+
+ handle->parent = (typeof(handle->parent)) {
+ .get_fd = fs_handle_untracked_get_fd,
+ .put_fd = fs_handle_untracked_put_fd,
+ .unlink = fs_handle_untracked_unlink,
+ .close = fs_handle_untracked_close,
+ };
+
+ handle->fd = fd;
+ reference_acquired = lttng_directory_handle_get(directory_handle);
+ assert(reference_acquired);
+ handle->location.directory_handle = directory_handle;
+ /* Ownership is transferred. */
+ handle->location.path = path_copy;
+ path_copy = NULL;
+end:
+ free(path_copy);
+ return handle ? &handle->parent : NULL;
+}
+
+static
+int fs_handle_untracked_get_fd(struct fs_handle *_handle)
+{
+ struct fs_handle_untracked *handle = container_of(
+ _handle, struct fs_handle_untracked, parent);
+
+ return handle->fd;
+}
+
+static
+void fs_handle_untracked_put_fd(struct fs_handle *_handle)
+{
+ /* no-op. */
+}
+
+static
+int fs_handle_untracked_unlink(struct fs_handle *_handle)
+{
+ struct fs_handle_untracked *handle = container_of(
+ _handle, struct fs_handle_untracked, parent);
+
+ return lttng_directory_handle_unlink_file(
+ handle->location.directory_handle,
+ handle->location.path);
+}
+
+static
+void fs_handle_untracked_destroy(struct fs_handle_untracked *handle)
+{
+ lttng_directory_handle_put(handle->location.directory_handle);
+ free(handle->location.path);
+ free(handle);
+}
+
+static
+int fs_handle_untracked_close(struct fs_handle *_handle)
+{
+ struct fs_handle_untracked *handle = container_of(
+ _handle, struct fs_handle_untracked, parent);
+ int ret = close(handle->fd);
+
+ fs_handle_untracked_destroy(handle);
+ return ret;
+}
+
static
bool lttng_trace_chunk_registry_element_equals(
const struct lttng_trace_chunk_registry_element *a,
assert(!ret);
}
-LTTNG_HIDDEN
-enum lttng_trace_chunk_status lttng_trace_chunk_open_file(
- struct lttng_trace_chunk *chunk, const char *file_path,
- int flags, mode_t mode, int *out_fd, bool expect_no_file)
+static
+enum lttng_trace_chunk_status _lttng_trace_chunk_open_fs_handle_locked(
+ struct lttng_trace_chunk *chunk,
+ const char *file_path,
+ int flags,
+ mode_t mode,
+ struct fs_handle **out_handle,
+ bool expect_no_file)
{
int ret;
enum lttng_trace_chunk_status status = LTTNG_TRACE_CHUNK_STATUS_OK;
DBG("Opening trace chunk file \"%s\"", file_path);
- pthread_mutex_lock(&chunk->lock);
if (!chunk->credentials.is_set) {
/*
* Fatal error, credentials must be set before a
if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
goto end;
}
- ret = lttng_directory_handle_open_file_as_user(
- chunk->chunk_directory, file_path, flags, mode,
- chunk->credentials.value.use_current_user ?
- NULL : &chunk->credentials.value.user);
+ if (chunk->fd_tracker) {
+ assert(chunk->credentials.value.use_current_user);
+ *out_handle = fd_tracker_open_fs_handle(chunk->fd_tracker,
+ chunk->chunk_directory, file_path, flags, &mode);
+ ret = *out_handle ? 0 : -1;
+ } else {
+ ret = lttng_directory_handle_open_file_as_user(
+ chunk->chunk_directory, file_path, flags, mode,
+ chunk->credentials.value.use_current_user ?
+ NULL :
+ &chunk->credentials.value.user);
+ if (ret >= 0) {
+ *out_handle = fs_handle_untracked_create(
+ chunk->chunk_directory, file_path, ret);
+ if (!*out_handle) {
+ status = LTTNG_TRACE_CHUNK_STATUS_ERROR;
+ goto end;
+ }
+ }
+ }
if (ret < 0) {
if (errno == ENOENT && expect_no_file) {
status = LTTNG_TRACE_CHUNK_STATUS_NO_FILE;
lttng_trace_chunk_remove_file(chunk, file_path);
goto end;
}
- *out_fd = ret;
end:
+ return status;
+}
+
+LTTNG_HIDDEN
+enum lttng_trace_chunk_status lttng_trace_chunk_open_fs_handle(
+ struct lttng_trace_chunk *chunk,
+ const char *file_path,
+ int flags,
+ mode_t mode,
+ struct fs_handle **out_handle,
+ bool expect_no_file)
+{
+ enum lttng_trace_chunk_status status;
+
+ pthread_mutex_lock(&chunk->lock);
+ status = _lttng_trace_chunk_open_fs_handle_locked(chunk, file_path,
+ flags, mode, out_handle, expect_no_file);
+ pthread_mutex_unlock(&chunk->lock);
+ return status;
+}
+
+LTTNG_HIDDEN
+enum lttng_trace_chunk_status lttng_trace_chunk_open_file(
+ struct lttng_trace_chunk *chunk,
+ const char *file_path,
+ int flags,
+ mode_t mode,
+ int *out_fd,
+ bool expect_no_file)
+{
+ enum lttng_trace_chunk_status status;
+ struct fs_handle *fs_handle;
+
+ pthread_mutex_lock(&chunk->lock);
+ /*
+ * Using this method is never valid when an fd_tracker is being
+ * used since the resulting file descriptor would not be tracked.
+ */
+ assert(!chunk->fd_tracker);
+ status = _lttng_trace_chunk_open_fs_handle_locked(chunk, file_path,
+ flags, mode, &fs_handle, expect_no_file);
pthread_mutex_unlock(&chunk->lock);
+
+ if (status == LTTNG_TRACE_CHUNK_STATUS_OK) {
+ *out_fd = fs_handle_get_fd(fs_handle);
+ /*
+ * Does not close the fd; we just "unbox" it from the fs_handle.
+ */
+ fs_handle_untracked_destroy(container_of(
+ fs_handle, struct fs_handle_untracked, parent));
+ }
+
return status;
}
#ifndef LTTNG_TRACE_CHUNK_H
#define LTTNG_TRACE_CHUNK_H
-#include <common/macros.h>
-#include <common/credentials.h>
#include <common/compat/directory-handle.h>
+#include <common/credentials.h>
+#include <common/fd-tracker/fd-tracker.h>
+#include <common/macros.h>
+#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
-#include <stdbool.h>
/*
* A trace chunk is a group of directories and files forming a (or a set of)
LTTNG_HIDDEN
enum lttng_trace_chunk_status lttng_trace_chunk_open_file(
- struct lttng_trace_chunk *chunk, const char *filename,
- int flags, mode_t mode, int *out_fd, bool expect_no_file);
+ struct lttng_trace_chunk *chunk,
+ const char *filename,
+ int flags,
+ mode_t mode,
+ int *out_fd,
+ bool expect_no_file);
+
+LTTNG_HIDDEN
+enum lttng_trace_chunk_status lttng_trace_chunk_open_fs_handle(
+ struct lttng_trace_chunk *chunk,
+ const char *filename,
+ int flags,
+ mode_t mode,
+ struct fs_handle **out_handle,
+ bool expect_no_file);
LTTNG_HIDDEN
int lttng_trace_chunk_unlink_file(struct lttng_trace_chunk *chunk,