2 * Copyright (C) 2013 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 #include <common/common.h>
24 #include <common/utils.h>
26 #include "lttng-relayd.h"
30 * Deferred free of a relay index object. MUST only be called by a call RCU.
32 static void deferred_free_relay_index(struct rcu_head
*head
)
34 struct relay_index
*index
=
35 caa_container_of(head
, struct relay_index
, rcu_node
);
37 if (index
->to_close_fd
>= 0) {
40 ret
= close(index
->to_close_fd
);
42 PERROR("Relay index to close fd %d", index
->to_close_fd
);
46 relay_index_free(index
);
50 * Allocate a new relay index object using the given stream ID and sequence
51 * number as the hash table key.
53 * Return allocated object or else NULL on error.
55 struct relay_index
*relay_index_create(uint64_t stream_id
,
58 struct relay_index
*index
;
60 DBG2("Creating relay index with stream id %" PRIu64
" and seqnum %" PRIu64
,
61 stream_id
, net_seq_num
);
63 index
= zmalloc(sizeof(*index
));
65 PERROR("Relay index zmalloc");
69 index
->to_close_fd
= -1;
70 lttng_ht_node_init_two_u64(&index
->index_n
, stream_id
, net_seq_num
);
77 * Find a relayd index in the given hash table.
79 * Return index object or else NULL on error.
81 struct relay_index
*relay_index_find(uint64_t stream_id
, uint64_t net_seq_num
)
83 struct lttng_ht_node_two_u64
*node
;
84 struct lttng_ht_iter iter
;
85 struct lttng_ht_two_u64 key
;
86 struct relay_index
*index
= NULL
;
88 DBG3("Finding index for stream id %" PRIu64
" and seq_num %" PRIu64
,
89 stream_id
, net_seq_num
);
92 key
.key2
= net_seq_num
;
94 lttng_ht_lookup(indexes_ht
, (void *)(&key
), &iter
);
95 node
= lttng_ht_iter_get_node_two_u64(&iter
);
99 index
= caa_container_of(node
, struct relay_index
, index_n
);
102 DBG2("Index %sfound in HT for stream ID %" PRIu64
" and seqnum %" PRIu64
,
103 (index
== NULL
) ? "NOT " : "", stream_id
, net_seq_num
);
108 * Add unique relay index to the given hash table. In case of a collision, the
109 * already existing object is put in the given _index variable.
111 * RCU read side lock MUST be acquired.
113 void relay_index_add(struct relay_index
*index
, struct relay_index
**_index
)
115 struct cds_lfht_node
*node_ptr
;
119 DBG2("Adding relay index with stream id %" PRIu64
" and seqnum %" PRIu64
,
120 index
->key
.key1
, index
->key
.key2
);
122 node_ptr
= cds_lfht_add_unique(indexes_ht
->ht
,
123 indexes_ht
->hash_fct((void *) &index
->index_n
.key
, lttng_ht_seed
),
124 indexes_ht
->match_fct
, (void *) &index
->index_n
.key
,
125 &index
->index_n
.node
);
126 if (node_ptr
!= &index
->index_n
.node
) {
127 *_index
= caa_container_of(node_ptr
, struct relay_index
, index_n
.node
);
132 * Write index on disk to the given fd. Once done error or not, it is removed
133 * from the hash table and destroy the object.
135 * MUST be called with a RCU read side lock held.
137 * Return 0 on success else a negative value.
139 int relay_index_write(int fd
, struct relay_index
*index
)
142 struct lttng_ht_iter iter
;
144 DBG2("Writing index for stream ID %" PRIu64
" and seq num %" PRIu64
145 " on fd %d", index
->key
.key1
, index
->key
.key2
, fd
);
147 /* Delete index from hash table. */
148 iter
.iter
.node
= &index
->index_n
.node
;
149 ret
= lttng_ht_del(indexes_ht
, &iter
);
151 call_rcu(&index
->rcu_node
, deferred_free_relay_index
);
153 return index_write(fd
, &index
->index_data
, sizeof(index
->index_data
));
157 * Free the given index.
159 void relay_index_free(struct relay_index
*index
)
165 * Safely free the given index using a call RCU.
167 void relay_index_free_safe(struct relay_index
*index
)
173 call_rcu(&index
->rcu_node
, deferred_free_relay_index
);
177 * Delete index from the given hash table.
179 * RCU read side lock MUST be acquired.
181 void relay_index_delete(struct relay_index
*index
)
184 struct lttng_ht_iter iter
;
186 DBG3("Relay index with stream ID %" PRIu64
" and seq num %" PRIu64
187 "deleted.", index
->key
.key1
, index
->key
.key2
);
189 /* Delete index from hash table. */
190 iter
.iter
.node
= &index
->index_n
.node
;
191 ret
= lttng_ht_del(indexes_ht
, &iter
);
196 * Destroy every relay index with the given stream id as part of the key.
198 void relay_index_destroy_by_stream_id(uint64_t stream_id
)
200 struct lttng_ht_iter iter
;
201 struct relay_index
*index
;
204 cds_lfht_for_each_entry(indexes_ht
->ht
, &iter
.iter
, index
, index_n
.node
) {
205 if (index
->key
.key1
== stream_id
) {
206 relay_index_delete(index
);
207 relay_index_free_safe(index
);