fs: dlm: get recovery sequence number as parameter

This patch removes a read of the ls->ls_recover_seq uint64_t number in
_create_rcom(). If the ls->ls_recover_seq is readed the ls_recover_lock
need to held. However this number was always readed before when any rcom
message is received and it's not necessary to read it again from a per
lockspace variable to use it for the replying message. This patch will
pass the sequence number as parameter so another read of ls->ls_recover_seq
and holding the ls->ls_recover_lock is not required.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2023-08-01 14:09:45 -04:00 committed by David Teigland
parent 643f5cfa61
commit c4f4e135c2
10 changed files with 99 additions and 85 deletions

View File

@ -58,7 +58,7 @@ void dlm_recover_dir_nodeid(struct dlm_ls *ls)
up_read(&ls->ls_root_sem);
}
int dlm_recover_directory(struct dlm_ls *ls)
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
char *b, *last_name = NULL;
@ -90,7 +90,7 @@ int dlm_recover_directory(struct dlm_ls *ls)
}
error = dlm_rcom_names(ls, memb->nodeid,
last_name, last_len);
last_name, last_len, seq);
if (error)
goto out_free;

View File

@ -15,7 +15,7 @@
int dlm_dir_nodeid(struct dlm_rsb *rsb);
int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash);
void dlm_recover_dir_nodeid(struct dlm_ls *ls);
int dlm_recover_directory(struct dlm_ls *ls);
int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq);
void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
char *outbuf, int outlen, int nodeid);

View File

@ -5464,7 +5464,8 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
}
/* needs at least dlm_rcom + rcom_lock */
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
uint64_t seq)
{
struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
struct dlm_rsb *r;
@ -5509,7 +5510,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
result);
dlm_send_rcom_lock(r, lkb);
dlm_send_rcom_lock(r, lkb, seq);
goto out;
case -EEXIST:
case 0:

View File

@ -37,7 +37,8 @@ void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc,
uint64_t seq);
int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
uint32_t flags, void *name, unsigned int namelen);

View File

@ -449,7 +449,7 @@ static void make_member_array(struct dlm_ls *ls)
/* send a status request to all members just to establish comms connections */
static int ping_members(struct dlm_ls *ls)
static int ping_members(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
int error = 0;
@ -459,7 +459,7 @@ static int ping_members(struct dlm_ls *ls)
error = -EINTR;
break;
}
error = dlm_rcom_status(ls, memb->nodeid, 0);
error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
if (error)
break;
}
@ -607,7 +607,7 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
make_member_array(ls);
*neg_out = neg;
error = ping_members(ls);
error = ping_members(ls, rv->seq);
log_rinfo(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
return error;
}

View File

@ -28,7 +28,8 @@ static int rcom_response(struct dlm_ls *ls)
}
static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_rcom **rc_ret, char *mb, int mb_len)
struct dlm_rcom **rc_ret, char *mb, int mb_len,
uint64_t seq)
{
struct dlm_rcom *rc;
@ -41,16 +42,14 @@ static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
rc->rc_header.h_cmd = DLM_RCOM;
rc->rc_type = cpu_to_le32(type);
spin_lock(&ls->ls_recover_lock);
rc->rc_seq = cpu_to_le64(ls->ls_recover_seq);
spin_unlock(&ls->ls_recover_lock);
rc->rc_seq = cpu_to_le64(seq);
*rc_ret = rc;
}
static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret,
uint64_t seq)
{
int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_mhandle *mh;
@ -63,14 +62,14 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
return -ENOBUFS;
}
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
*mh_ret = mh;
return 0;
}
static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
int len, struct dlm_rcom **rc_ret,
struct dlm_msg **msg_ret)
struct dlm_msg **msg_ret, uint64_t seq)
{
int mb_len = sizeof(struct dlm_rcom) + len;
struct dlm_msg *msg;
@ -84,7 +83,7 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
return -ENOBUFS;
}
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len);
_create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq);
*msg_ret = msg;
return 0;
}
@ -170,7 +169,8 @@ static void disallow_sync_reply(struct dlm_ls *ls)
* node's rcom_config.
*/
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_msg *msg;
@ -186,7 +186,8 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
retry:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS,
sizeof(struct rcom_status), &rc, &msg);
sizeof(struct rcom_status), &rc, &msg,
seq);
if (error)
goto out;
@ -220,7 +221,8 @@ retry:
return error;
}
static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in,
uint64_t seq)
{
struct dlm_rcom *rc;
struct rcom_status *rs;
@ -251,7 +253,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
do_create:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY,
len, &rc, &msg);
len, &rc, &msg, seq);
if (error)
return;
@ -302,7 +304,8 @@ static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_rcom_spin);
}
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
int last_len, uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_msg *msg;
@ -312,7 +315,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
retry:
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len,
&rc, &msg);
&rc, &msg, seq);
if (error)
goto out;
memcpy(rc->rc_buf, last_name, last_len);
@ -330,7 +333,8 @@ retry:
return error;
}
static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in,
uint64_t seq)
{
struct dlm_rcom *rc;
int error, inlen, outlen, nodeid;
@ -342,7 +346,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom);
error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen,
&rc, &msg);
&rc, &msg, seq);
if (error)
return;
rc->rc_id = rc_in->rc_id;
@ -353,7 +357,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
send_rcom_stateless(msg, rc);
}
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@ -361,7 +365,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
int error;
error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
&rc, &mh);
&rc, &mh, seq);
if (error)
goto out;
memcpy(rc->rc_buf, r->res_name, r->res_length);
@ -372,7 +376,8 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
return error;
}
static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in,
uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@ -387,7 +392,8 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
return;
}
error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh,
seq);
if (error)
return;
@ -437,7 +443,7 @@ static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
}
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq)
{
struct dlm_ls *ls = r->res_ls;
struct dlm_rcom *rc;
@ -448,7 +454,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
if (lkb->lkb_lvbptr)
len += ls->ls_lvblen;
error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh,
seq);
if (error)
goto out;
@ -462,7 +469,8 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
}
/* needs at least dlm_rcom + rcom_lock */
static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in,
uint64_t seq)
{
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
@ -471,7 +479,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_recover_master_copy(ls, rc_in);
error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
sizeof(struct rcom_lock), &rc, &mh);
sizeof(struct rcom_lock), &rc, &mh, seq);
if (error)
return;
@ -620,21 +628,21 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
switch (rc->rc_type) {
case cpu_to_le32(DLM_RCOM_STATUS):
receive_rcom_status(ls, rc);
receive_rcom_status(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_NAMES):
receive_rcom_names(ls, rc);
receive_rcom_names(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_LOOKUP):
receive_rcom_lookup(ls, rc);
receive_rcom_lookup(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_LOCK):
if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
goto Eshort;
receive_rcom_lock(ls, rc);
receive_rcom_lock(ls, rc, seq);
break;
case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
@ -652,7 +660,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
case cpu_to_le32(DLM_RCOM_LOCK_REPLY):
if (le16_to_cpu(rc->rc_header.h_length) < lock_size)
goto Eshort;
dlm_recover_process_copy(ls, rc);
dlm_recover_process_copy(ls, rc, seq);
break;
default:

View File

@ -12,10 +12,12 @@
#ifndef __RCOM_DOT_H__
#define __RCOM_DOT_H__
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags,
uint64_t seq);
int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,
int last_len, uint64_t seq);
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq);
int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq);
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid);
int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in);

View File

@ -93,7 +93,7 @@ void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
}
static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
int save_slots)
int save_slots, uint64_t seq)
{
struct dlm_rcom *rc = ls->ls_recover_buf;
struct dlm_member *memb;
@ -107,7 +107,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
goto out;
}
error = dlm_rcom_status(ls, memb->nodeid, 0);
error = dlm_rcom_status(ls, memb->nodeid, 0, seq);
if (error)
goto out;
@ -126,7 +126,7 @@ static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
}
static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
uint32_t status_flags)
uint32_t status_flags, uint64_t seq)
{
struct dlm_rcom *rc = ls->ls_recover_buf;
int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@ -137,7 +137,7 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
goto out;
}
error = dlm_rcom_status(ls, nodeid, status_flags);
error = dlm_rcom_status(ls, nodeid, status_flags, seq);
if (error)
break;
@ -151,22 +151,22 @@ static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
return error;
}
static int wait_status(struct dlm_ls *ls, uint32_t status)
static int wait_status(struct dlm_ls *ls, uint32_t status, uint64_t seq)
{
uint32_t status_all = status << 1;
int error;
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
error = wait_status_all(ls, status, 0);
error = wait_status_all(ls, status, 0, seq);
if (!error)
dlm_set_recover_status(ls, status_all);
} else
error = wait_status_low(ls, status_all, 0);
error = wait_status_low(ls, status_all, 0, seq);
return error;
}
int dlm_recover_members_wait(struct dlm_ls *ls)
int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_member *memb;
struct dlm_slot *slots;
@ -180,7 +180,7 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
}
if (ls->ls_low_nodeid == dlm_our_nodeid()) {
error = wait_status_all(ls, DLM_RS_NODES, 1);
error = wait_status_all(ls, DLM_RS_NODES, 1, seq);
if (error)
goto out;
@ -199,7 +199,8 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
}
} else {
error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
error = wait_status_low(ls, DLM_RS_NODES_ALL,
DLM_RSF_NEED_SLOTS, seq);
if (error)
goto out;
@ -209,19 +210,19 @@ int dlm_recover_members_wait(struct dlm_ls *ls)
return error;
}
int dlm_recover_directory_wait(struct dlm_ls *ls)
int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq)
{
return wait_status(ls, DLM_RS_DIR);
return wait_status(ls, DLM_RS_DIR, seq);
}
int dlm_recover_locks_wait(struct dlm_ls *ls)
int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq)
{
return wait_status(ls, DLM_RS_LOCKS);
return wait_status(ls, DLM_RS_LOCKS, seq);
}
int dlm_recover_done_wait(struct dlm_ls *ls)
int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq)
{
return wait_status(ls, DLM_RS_DONE);
return wait_status(ls, DLM_RS_DONE, seq);
}
/*
@ -441,7 +442,7 @@ static void set_new_master(struct dlm_rsb *r)
* equals our_nodeid below).
*/
static int recover_master(struct dlm_rsb *r, unsigned int *count)
static int recover_master(struct dlm_rsb *r, unsigned int *count, uint64_t seq)
{
struct dlm_ls *ls = r->res_ls;
int our_nodeid, dir_nodeid;
@ -472,7 +473,7 @@ static int recover_master(struct dlm_rsb *r, unsigned int *count)
error = 0;
} else {
recover_idr_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid);
error = dlm_send_rcom_lookup(r, dir_nodeid, seq);
}
(*count)++;
@ -520,7 +521,7 @@ static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
* the correct dir node.
*/
int dlm_recover_masters(struct dlm_ls *ls)
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_rsb *r;
unsigned int total = 0;
@ -542,7 +543,7 @@ int dlm_recover_masters(struct dlm_ls *ls)
if (nodir)
error = recover_master_static(r, &count);
else
error = recover_master(r, &count);
error = recover_master(r, &count, seq);
unlock_rsb(r);
cond_resched();
total++;
@ -614,13 +615,14 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
* an equal number of replies then recovery for the rsb is done
*/
static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head,
uint64_t seq)
{
struct dlm_lkb *lkb;
int error = 0;
list_for_each_entry(lkb, head, lkb_statequeue) {
error = dlm_send_rcom_lock(r, lkb);
error = dlm_send_rcom_lock(r, lkb, seq);
if (error)
break;
r->res_recover_locks_count++;
@ -629,7 +631,7 @@ static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
return error;
}
static int recover_locks(struct dlm_rsb *r)
static int recover_locks(struct dlm_rsb *r, uint64_t seq)
{
int error = 0;
@ -637,13 +639,13 @@ static int recover_locks(struct dlm_rsb *r)
DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
error = recover_locks_queue(r, &r->res_grantqueue);
error = recover_locks_queue(r, &r->res_grantqueue, seq);
if (error)
goto out;
error = recover_locks_queue(r, &r->res_convertqueue);
error = recover_locks_queue(r, &r->res_convertqueue, seq);
if (error)
goto out;
error = recover_locks_queue(r, &r->res_waitqueue);
error = recover_locks_queue(r, &r->res_waitqueue, seq);
if (error)
goto out;
@ -656,7 +658,7 @@ static int recover_locks(struct dlm_rsb *r)
return error;
}
int dlm_recover_locks(struct dlm_ls *ls)
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq)
{
struct dlm_rsb *r;
int error, count = 0;
@ -677,7 +679,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
goto out;
}
error = recover_locks(r);
error = recover_locks(r, seq);
if (error) {
up_read(&ls->ls_root_sem);
goto out;

View File

@ -15,13 +15,13 @@
int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls));
uint32_t dlm_recover_status(struct dlm_ls *ls);
void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status);
int dlm_recover_members_wait(struct dlm_ls *ls);
int dlm_recover_directory_wait(struct dlm_ls *ls);
int dlm_recover_locks_wait(struct dlm_ls *ls);
int dlm_recover_done_wait(struct dlm_ls *ls);
int dlm_recover_masters(struct dlm_ls *ls);
int dlm_recover_members_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_directory_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_locks_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_done_wait(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_masters(struct dlm_ls *ls, uint64_t seq);
int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc);
int dlm_recover_locks(struct dlm_ls *ls);
int dlm_recover_locks(struct dlm_ls *ls, uint64_t seq);
void dlm_recovered_lock(struct dlm_rsb *r);
int dlm_create_root_list(struct dlm_ls *ls);
void dlm_release_root_list(struct dlm_ls *ls);

View File

@ -90,7 +90,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_NODES);
error = dlm_recover_members_wait(ls);
error = dlm_recover_members_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_members_wait error %d", error);
goto fail;
@ -103,7 +103,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* nodes their master rsb names that hash to us.
*/
error = dlm_recover_directory(ls);
error = dlm_recover_directory(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory error %d", error);
goto fail;
@ -111,7 +111,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_DIR);
error = dlm_recover_directory_wait(ls);
error = dlm_recover_directory_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
goto fail;
@ -145,7 +145,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* departed nodes.
*/
error = dlm_recover_masters(ls);
error = dlm_recover_masters(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_masters error %d", error);
goto fail;
@ -155,7 +155,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* Send our locks on remastered rsb's to the new masters.
*/
error = dlm_recover_locks(ls);
error = dlm_recover_locks(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks error %d", error);
goto fail;
@ -163,7 +163,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_LOCKS);
error = dlm_recover_locks_wait(ls);
error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
@ -187,7 +187,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
*/
dlm_set_recover_status(ls, DLM_RS_LOCKS);
error = dlm_recover_locks_wait(ls);
error = dlm_recover_locks_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
goto fail;
@ -206,7 +206,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_set_recover_status(ls, DLM_RS_DONE);
error = dlm_recover_done_wait(ls);
error = dlm_recover_done_wait(ls, rv->seq);
if (error) {
log_rinfo(ls, "dlm_recover_done_wait error %d", error);
goto fail;