summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
98b5ff3)
Coverity reports a lock inversion scenario in
handle_notification_thread_client_disconnect() where a client's lock is
held while acquiring the client list lock. This is indeed a problem.
As indicated in the notification_client and notification_client_list
comments, the locking was shoe-horned to make it possible for the action
executor to enqueue notifications in a client's outgoing queue and flush
it.
Since this is the only access pattern that is supported, the client
locking is reworked slightly to only acquire the client lock when
checking the "active" flag, interacting with the outbound communication
state, and sending through a client's socket.
This change makes the client locking regions more narrow which accounts
for the somewhat large number of lines affected.
The updates to the `active` flag on error are moved to the function that
flushes the outbound queue instead of expecting the callers to set it.
This allows the locking to be limited to this function rather than
relying on the callers.
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I8632d0f7785ec727dabd329bdfba010fd5e4643a
struct action_executor *executor = user_data;
bool update_communication = true;
struct action_executor *executor = user_data;
bool update_communication = true;
- ASSERT_LOCKED(client->lock);
-
switch (status) {
case CLIENT_TRANSMISSION_STATUS_COMPLETE:
DBG("Successfully sent full notification to client, client_id = %" PRIu64,
switch (status) {
case CLIENT_TRANSMISSION_STATUS_COMPLETE:
DBG("Successfully sent full notification to client, client_id = %" PRIu64,
case CLIENT_TRANSMISSION_STATUS_FAIL:
DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
client->id);
case CLIENT_TRANSMISSION_STATUS_FAIL:
DBG("Communication error occurred while sending notification to client, client_id = %" PRIu64,
client->id);
- client->communication.active = false;
break;
default:
ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
client->id);
break;
default:
ERR("Fatal error encoutered while sending notification to client, client_id = %" PRIu64,
client->id);
- client->communication.active = false;
+ /* Safe to read client's id without locking as it is immutable. */
ret = notification_thread_client_communication_update(
executor->notification_thread_handle, client->id,
status);
ret = notification_thread_client_communication_update(
executor->notification_thread_handle, client->id,
status);
client_id);
ret = 0;
} else {
client_id);
ret = 0;
} else {
- pthread_mutex_lock(&client->lock);
ret = client_handle_transmission_status(
client, client_status, state);
ret = client_handle_transmission_status(
client, client_status, state);
- pthread_mutex_unlock(&client->lock);
}
rcu_read_unlock();
break;
}
rcu_read_unlock();
break;
-/* Client lock must be acquired by caller. */
static
int client_reset_inbound_state(struct notification_client *client)
{
int ret;
static
int client_reset_inbound_state(struct notification_client *client)
{
int ret;
- ASSERT_LOCKED(client->lock);
-
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, 0);
assert(!ret);
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, 0);
assert(!ret);
pthread_mutex_init(&client->lock, NULL);
client->id = state->next_notification_client_id++;
CDS_INIT_LIST_HEAD(&client->condition_list);
pthread_mutex_init(&client->lock, NULL);
client->id = state->next_notification_client_id++;
CDS_INIT_LIST_HEAD(&client->condition_list);
lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
client->communication.inbound.expect_creds = true;
lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
client->communication.inbound.expect_creds = true;
- pthread_mutex_lock(&client->lock);
ret = client_reset_inbound_state(client);
ret = client_reset_inbound_state(client);
- pthread_mutex_unlock(&client->lock);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
ret = 0;
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
ret = 0;
rcu_read_unlock();
return ret;
rcu_read_unlock();
return ret;
error:
notification_client_destroy(client, state);
return ret;
}
error:
notification_client_destroy(client, state);
return ret;
}
-/* RCU read-lock must be held by the caller. */
-/* Client lock must be held by the caller */
+/*
+ * RCU read-lock must be held by the caller.
+ * Client lock must _not_ be held by the caller.
+ */
static
int notification_thread_client_disconnect(
struct notification_client *client,
static
int notification_thread_client_disconnect(
struct notification_client *client,
struct lttng_condition_list_element *condition_list_element, *tmp;
/* Acquire the client lock to disable its communication atomically. */
struct lttng_condition_list_element *condition_list_element, *tmp;
/* Acquire the client lock to disable its communication atomically. */
+ pthread_mutex_lock(&client->lock);
client->communication.active = false;
client->communication.active = false;
+ cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
+ cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
+ pthread_mutex_unlock(&client->lock);
+
ret = lttng_poll_del(&state->events, client->socket);
if (ret) {
ERR("[notification-thread] Failed to remove client socket %d from poll set",
client->socket);
}
ret = lttng_poll_del(&state->events, client->socket);
if (ret) {
ERR("[notification-thread] Failed to remove client socket %d from poll set",
client->socket);
}
- cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
- cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
-
/* Release all conditions to which the client was subscribed. */
cds_list_for_each_entry_safe(condition_list_element, tmp,
&client->condition_list, node) {
/* Release all conditions to which the client was subscribed. */
cds_list_for_each_entry_safe(condition_list_element, tmp,
&client->condition_list, node) {
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(client, state);
ret = notification_thread_client_disconnect(client, state);
- pthread_mutex_unlock(&client->lock);
end:
rcu_read_unlock();
return ret;
end:
rcu_read_unlock();
return ret;
client_socket_ht_node) {
int ret;
client_socket_ht_node) {
int ret;
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(
client, state);
ret = notification_thread_client_disconnect(
client, state);
- pthread_mutex_unlock(&client->lock);
if (ret) {
error_encoutered = true;
}
if (ret) {
error_encoutered = true;
}
- client->communication.outbound.queued_command_reply = false;
- client->communication.outbound.dropped_notification = false;
break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
/*
break;
case CLIENT_TRANSMISSION_STATUS_QUEUED:
/*
status = CLIENT_TRANSMISSION_STATUS_QUEUED;
} else if (ret < 0) {
status = CLIENT_TRANSMISSION_STATUS_QUEUED;
} else if (ret < 0) {
- /* Generic error, disconnect the client. */
+ /* Generic error, disable the client's communication. */
ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
client->socket);
ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
client->socket);
+ client->communication.active = false;
status = CLIENT_TRANSMISSION_STATUS_FAIL;
} else {
/* No error and flushed the queue completely. */
status = CLIENT_TRANSMISSION_STATUS_FAIL;
} else {
/* No error and flushed the queue completely. */
+
+ client->communication.outbound.queued_command_reply = false;
+ client->communication.outbound.dropped_notification = false;
status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
}
end:
status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
}
end:
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
return CLIENT_TRANSMISSION_STATUS_ERROR;
}
-/* Client lock must be acquired by caller. */
+/* Client lock must _not_ be held by the caller. */
static
int client_send_command_reply(struct notification_client *client,
struct notification_thread_state *state,
static
int client_send_command_reply(struct notification_client *client,
struct notification_thread_state *state,
char buffer[sizeof(msg) + sizeof(reply)];
enum client_transmission_status transmission_status;
char buffer[sizeof(msg) + sizeof(reply)];
enum client_transmission_status transmission_status;
- ASSERT_LOCKED(client->lock);
+ memcpy(buffer, &msg, sizeof(msg));
+ memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
+ DBG("[notification-thread] Send command reply (%i)", (int) status);
+ pthread_mutex_lock(&client->lock);
if (client->communication.outbound.queued_command_reply) {
/* Protocol error. */
if (client->communication.outbound.queued_command_reply) {
/* Protocol error. */
- memcpy(buffer, &msg, sizeof(msg));
- memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
- DBG("[notification-thread] Send command reply (%i)", (int) status);
-
/* Enqueue buffer to outgoing queue and flush it. */
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer,
buffer, sizeof(buffer));
if (ret) {
/* Enqueue buffer to outgoing queue and flush it. */
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer,
buffer, sizeof(buffer));
if (ret) {
}
transmission_status = client_flush_outgoing_queue(client);
}
transmission_status = client_flush_outgoing_queue(client);
+ if (client->communication.outbound.buffer.size != 0) {
+ /* Queue could not be emptied. */
+ client->communication.outbound.queued_command_reply = true;
+ }
+
+ pthread_mutex_unlock(&client->lock);
ret = client_handle_transmission_status(
client, transmission_status, state);
if (ret) {
goto error;
}
ret = client_handle_transmission_status(
client, transmission_status, state);
if (ret) {
goto error;
}
- if (client->communication.outbound.buffer.size != 0) {
- /* Queue could not be emptied. */
- client->communication.outbound.queued_command_reply = true;
- }
-
+error_unlock:
+ pthread_mutex_unlock(&client->lock);
struct notification_thread_state *state)
{
int ret;
struct notification_thread_state *state)
{
int ret;
-
- pthread_mutex_lock(&client->lock);
-
/*
* Receiving message header. The function will be called again
* once the rest of the message as been received and can be
/*
* Receiving message header. The function will be called again
* once the rest of the message as been received and can be
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, msg->size);
end:
ret = lttng_dynamic_buffer_set_size(
&client->communication.inbound.buffer, msg->size);
end:
- pthread_mutex_unlock(&client->lock);
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
enum lttng_notification_channel_status status =
LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
- enum client_transmission_status transmission_status;
-
- pthread_mutex_lock(&client->lock);
memcpy(send_buffer, &msg_header, sizeof(msg_header));
memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
memcpy(send_buffer, &msg_header, sizeof(msg_header));
memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
}
status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
}
+ pthread_mutex_lock(&client->lock);
+ /* Outgoing queue will be flushed when the command reply is sent. */
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer, send_buffer,
sizeof(send_buffer));
if (ret) {
ERR("[notification-thread] Failed to send protocol version to notification channel client");
ret = lttng_dynamic_buffer_append(
&client->communication.outbound.buffer, send_buffer,
sizeof(send_buffer));
if (ret) {
ERR("[notification-thread] Failed to send protocol version to notification channel client");
}
client->validated = true;
client->communication.active = true;
}
client->validated = true;
client->communication.active = true;
+ pthread_mutex_unlock(&client->lock);
- transmission_status = client_flush_outgoing_queue(client);
- ret = client_handle_transmission_status(
- client, transmission_status, state);
+ /* Set reception state to receive the next message header. */
+ ret = client_reset_inbound_state(client);
+ ERR("[notification-thread] Failed to reset client communication's inbound state");
+ /* Flushes the outgoing queue. */
ret = client_send_command_reply(client, state, status);
if (ret) {
ERR("[notification-thread] Failed to send reply to notification channel client");
goto end;
}
ret = client_send_command_reply(client, state, status);
if (ret) {
ERR("[notification-thread] Failed to send reply to notification channel client");
goto end;
}
- /* Set reception state to receive the next message header. */
- ret = client_reset_inbound_state(client);
- if (ret) {
- ERR("[notification-thread] Failed to reset client communication's inbound state");
- goto end;
- }
-
-end:
pthread_mutex_unlock(&client->lock);
pthread_mutex_unlock(&client->lock);
0, -1);
size_t expected_condition_size;
0, -1);
size_t expected_condition_size;
- pthread_mutex_lock(&client->lock);
+ /*
+ * No need to lock client to sample the inbound state as the only
+ * other thread accessing clients (action executor) only uses the
+ * outbound state.
+ */
expected_condition_size = client->communication.inbound.buffer.size;
expected_condition_size = client->communication.inbound.buffer.size;
- pthread_mutex_unlock(&client->lock);
-
ret = lttng_condition_create_from_payload(&condition_view, &condition);
if (ret != expected_condition_size) {
ERR("[notification-thread] Malformed condition received from client");
ret = lttng_condition_create_from_payload(&condition_view, &condition);
if (ret != expected_condition_size) {
ERR("[notification-thread] Malformed condition received from client");
ret = notification_thread_client_unsubscribe(
client, condition, state, &status);
}
ret = notification_thread_client_unsubscribe(
client, condition, state, &status);
}
- if (ret) {
- goto end;
- }
- pthread_mutex_lock(&client->lock);
- ret = client_send_command_reply(client, state, status);
- ERR("[notification-thread] Failed to send reply to notification channel client");
- goto end_unlock;
}
/* Set reception state to receive the next message header. */
ret = client_reset_inbound_state(client);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
}
/* Set reception state to receive the next message header. */
ret = client_reset_inbound_state(client);
if (ret) {
ERR("[notification-thread] Failed to reset client communication's inbound state");
+ goto end;
+ }
+
+ ret = client_send_command_reply(client, state, status);
+ if (ret) {
+ ERR("[notification-thread] Failed to send reply to notification channel client");
+ goto end;
-end_unlock:
- pthread_mutex_unlock(&client->lock);
- pthread_mutex_lock(&client->lock);
offset = client->communication.inbound.buffer.size -
client->communication.inbound.bytes_to_receive;
if (client->communication.inbound.expect_creds) {
offset = client->communication.inbound.buffer.size -
client->communication.inbound.bytes_to_receive;
if (client->communication.inbound.expect_creds) {
message_is_complete = client->communication.inbound
.bytes_to_receive == 0;
}
message_is_complete = client->communication.inbound
.bytes_to_receive == 0;
}
- pthread_mutex_unlock(&client->lock);
if (recv_ret < 0) {
goto error_disconnect_client;
}
if (recv_ret < 0) {
goto error_disconnect_client;
}
rcu_read_unlock();
return ret;
error_disconnect_client:
rcu_read_unlock();
return ret;
error_disconnect_client:
- pthread_mutex_lock(&client->lock);
ret = notification_thread_client_disconnect(client, state);
ret = notification_thread_client_disconnect(client, state);
- pthread_mutex_unlock(&client->lock);
pthread_mutex_lock(&client->lock);
transmission_status = client_flush_outgoing_queue(client);
pthread_mutex_lock(&client->lock);
transmission_status = client_flush_outgoing_queue(client);
+ pthread_mutex_unlock(&client->lock);
+
ret = client_handle_transmission_status(
client, transmission_status, state);
ret = client_handle_transmission_status(
client, transmission_status, state);
- pthread_mutex_unlock(&client->lock);
ret = 0;
pthread_mutex_lock(&client->lock);
ret = 0;
pthread_mutex_lock(&client->lock);
+ if (!client->communication.active) {
+ /*
+ * Skip inactive client (protocol error or
+ * disconnecting).
+ */
+ DBG("Skipping client at it is marked as inactive");
+ goto skip_client;
+ }
+
if (source_object_creds) {
if (client->uid != source_object_creds->uid &&
client->gid != source_object_creds->gid &&
if (source_object_creds) {
if (client->uid != source_object_creds->uid &&
client->gid != source_object_creds->gid &&
* object.
*/
DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
* object.
*/
DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
}
}
if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
}
}
if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
}
DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
}
DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
*/
ret = client_notification_overflow(client);
if (ret) {
*/
ret = client_notification_overflow(client);
if (ret) {
+ /* Fatal error. */
+ goto skip_client;
&client->communication.outbound.buffer,
&msg_payload.buffer);
if (ret) {
&client->communication.outbound.buffer,
&msg_payload.buffer);
if (ret) {
+ /* Fatal error. */
+ goto skip_client;
}
transmission_status = client_flush_outgoing_queue(client);
}
transmission_status = client_flush_outgoing_queue(client);
+ pthread_mutex_unlock(&client->lock);
ret = client_report(client, transmission_status, user_data);
if (ret) {
ret = client_report(client, transmission_status, user_data);
if (ret) {
+ /* Fatal error. */
+ goto end_unlock_list;
+
+ continue;
+
+skip_client:
pthread_mutex_unlock(&client->lock);
if (ret) {
pthread_mutex_unlock(&client->lock);
if (ret) {
goto end_unlock_list;
}
}
goto end_unlock_list;
}
}
};
struct notification_client {
};
struct notification_client {
- /* Nests within the notification_client_list lock. */
+ /*
+ * Nests within the notification_client_list lock.
+ *
+ * Protects the outbound communication and the active flag which
+ * is used by both the notification and action executor threads.
+ *
+ * The remaining fields of the object can be used without any
+ * synchronization as they are either immutable (id, creds, version) or
+ * only accessed by the notification thread.
+ */
pthread_mutex_t lock;
notification_client_id id;
int socket;
pthread_mutex_t lock;
notification_client_id id;
int socket;
LTTNG_HIDDEN
void notification_client_list_put(struct notification_client_list *list);
LTTNG_HIDDEN
void notification_client_list_put(struct notification_client_list *list);
+/* Only returns a non-zero value if a fatal error occurred. */
typedef int (*report_client_transmission_result_cb)(
struct notification_client *client,
enum client_transmission_status status,
typedef int (*report_client_transmission_result_cb)(
struct notification_client *client,
enum client_transmission_status status,