fs: dlm: add send ack threshold and append acks to msgs
This patch changes the time when we sending an ack back to tell the other side it can free some message because it is arrived on the receiver node, due random reconnects e.g. TCP resets this is handled as well on application layer to not let DLM run into a deadlock state. The current handling has the following problems: 1. We end in situations that we only send an ack back message of 16 bytes out and no other messages. Whereas DLM has logic to combine so much messages as it can in one send() socket call. This behaviour can be discovered by "trace-cmd start -e dlm_recv" and observing the ret field being 16 bytes. 2. When processing of DLM messages will never end because we receive a lot of messages, we will not send an ack back as it happens when the processing loop ends. This patch introduces a likely and unlikely threshold case. The likely case will send an ack back on a transmit path if the threshold is triggered of amount of processed upper layer protocol. This will solve issue 1 because it will be send when another normal DLM message will be sent. It solves issue 2 because it is not part of the processing loop. There is however a unlikely case, the unlikely case has a bigger threshold and will be triggered when we only receive messages and do not sent any message back. This case avoids that the sending node will keep a lot of message for a long time as we send sometimes ack backs to tell the sender to finally release messages. The atomic cmpxchg() is there to provide a atomically ack send with reset of the upper layer protocol delivery counter. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
parent
d00725cab2
commit
1696c75f18
@ -860,30 +860,8 @@ struct dlm_processed_nodes {
|
||||
struct list_head list;
|
||||
};
|
||||
|
||||
static void add_processed_node(int nodeid, struct list_head *processed_nodes)
|
||||
{
|
||||
struct dlm_processed_nodes *n;
|
||||
|
||||
list_for_each_entry(n, processed_nodes, list) {
|
||||
/* we already remembered this node */
|
||||
if (n->nodeid == nodeid)
|
||||
return;
|
||||
}
|
||||
|
||||
/* if it's fails in worst case we simple don't send an ack back.
|
||||
* We try it next time.
|
||||
*/
|
||||
n = kmalloc(sizeof(*n), GFP_NOFS);
|
||||
if (!n)
|
||||
return;
|
||||
|
||||
n->nodeid = nodeid;
|
||||
list_add(&n->list, processed_nodes);
|
||||
}
|
||||
|
||||
static void process_dlm_messages(struct work_struct *work)
|
||||
{
|
||||
struct dlm_processed_nodes *n, *n_tmp;
|
||||
struct processqueue_entry *pentry;
|
||||
LIST_HEAD(processed_nodes);
|
||||
|
||||
@ -902,7 +880,6 @@ static void process_dlm_messages(struct work_struct *work)
|
||||
for (;;) {
|
||||
dlm_process_incoming_buffer(pentry->nodeid, pentry->buf,
|
||||
pentry->buflen);
|
||||
add_processed_node(pentry->nodeid, &processed_nodes);
|
||||
free_processqueue_entry(pentry);
|
||||
|
||||
spin_lock(&processqueue_lock);
|
||||
@ -917,13 +894,6 @@ static void process_dlm_messages(struct work_struct *work)
|
||||
list_del(&pentry->list);
|
||||
spin_unlock(&processqueue_lock);
|
||||
}
|
||||
|
||||
/* send ack back after we processed couple of messages */
|
||||
list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) {
|
||||
list_del(&n->list);
|
||||
dlm_midcomms_receive_done(n->nodeid);
|
||||
kfree(n);
|
||||
}
|
||||
}
|
||||
|
||||
/* Data received from remote end */
|
||||
|
@ -148,6 +148,8 @@
|
||||
/* 5 seconds wait to sync ending of dlm */
|
||||
#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000)
|
||||
#define DLM_VERSION_NOT_SET 0
|
||||
#define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32
|
||||
#define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8)
|
||||
|
||||
struct midcomms_node {
|
||||
int nodeid;
|
||||
@ -165,7 +167,7 @@ struct midcomms_node {
|
||||
#define DLM_NODE_FLAG_CLOSE 1
|
||||
#define DLM_NODE_FLAG_STOP_TX 2
|
||||
#define DLM_NODE_FLAG_STOP_RX 3
|
||||
#define DLM_NODE_ULP_DELIVERED 4
|
||||
atomic_t ulp_delivered;
|
||||
unsigned long flags;
|
||||
wait_queue_head_t shutdown_wait;
|
||||
|
||||
@ -319,6 +321,7 @@ static void midcomms_node_reset(struct midcomms_node *node)
|
||||
|
||||
atomic_set(&node->seq_next, DLM_SEQ_INIT);
|
||||
atomic_set(&node->seq_send, DLM_SEQ_INIT);
|
||||
atomic_set(&node->ulp_delivered, 0);
|
||||
node->version = DLM_VERSION_NOT_SET;
|
||||
node->flags = 0;
|
||||
|
||||
@ -393,6 +396,28 @@ static int dlm_send_ack(int nodeid, uint32_t seq)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dlm_send_ack_threshold(struct midcomms_node *node,
|
||||
uint32_t threshold)
|
||||
{
|
||||
uint32_t oval, nval;
|
||||
bool send_ack;
|
||||
|
||||
/* let only send one user trigger threshold to send ack back */
|
||||
do {
|
||||
oval = atomic_read(&node->ulp_delivered);
|
||||
send_ack = (oval > threshold);
|
||||
/* abort if threshold is not reached */
|
||||
if (!send_ack)
|
||||
break;
|
||||
|
||||
nval = 0;
|
||||
/* try to reset ulp_delivered counter */
|
||||
} while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval);
|
||||
|
||||
if (send_ack)
|
||||
dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
|
||||
}
|
||||
|
||||
static int dlm_send_fin(struct midcomms_node *node,
|
||||
void (*ack_rcv)(struct midcomms_node *node))
|
||||
{
|
||||
@ -560,7 +585,9 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
|
||||
WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
|
||||
dlm_receive_buffer_3_2_trace(seq, p);
|
||||
dlm_receive_buffer(p, node->nodeid);
|
||||
set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
|
||||
atomic_inc(&node->ulp_delivered);
|
||||
/* unlikely case to send ack back when we don't transmit */
|
||||
dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
@ -969,49 +996,6 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void dlm_midcomms_receive_done(int nodeid)
|
||||
{
|
||||
struct midcomms_node *node;
|
||||
int idx;
|
||||
|
||||
idx = srcu_read_lock(&nodes_srcu);
|
||||
node = nodeid2node(nodeid, 0);
|
||||
if (!node) {
|
||||
srcu_read_unlock(&nodes_srcu, idx);
|
||||
return;
|
||||
}
|
||||
|
||||
/* old protocol, we do nothing */
|
||||
switch (node->version) {
|
||||
case DLM_VERSION_3_2:
|
||||
break;
|
||||
default:
|
||||
srcu_read_unlock(&nodes_srcu, idx);
|
||||
return;
|
||||
}
|
||||
|
||||
/* do nothing if we didn't delivered stateful to ulp */
|
||||
if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
|
||||
&node->flags)) {
|
||||
srcu_read_unlock(&nodes_srcu, idx);
|
||||
return;
|
||||
}
|
||||
|
||||
spin_lock(&node->state_lock);
|
||||
/* we only ack if state is ESTABLISHED */
|
||||
switch (node->state) {
|
||||
case DLM_ESTABLISHED:
|
||||
spin_unlock(&node->state_lock);
|
||||
dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
|
||||
break;
|
||||
default:
|
||||
spin_unlock(&node->state_lock);
|
||||
/* do nothing FIN has it's own ack send */
|
||||
break;
|
||||
}
|
||||
srcu_read_unlock(&nodes_srcu, idx);
|
||||
}
|
||||
|
||||
void dlm_midcomms_unack_msg_resend(int nodeid)
|
||||
{
|
||||
struct midcomms_node *node;
|
||||
@ -1142,6 +1126,8 @@ struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
|
||||
goto err;
|
||||
}
|
||||
|
||||
/* send ack back if necessary */
|
||||
dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD);
|
||||
break;
|
||||
default:
|
||||
dlm_free_mhandle(mh);
|
||||
|
Loading…
Reference in New Issue
Block a user