1    	/*
2    	 * Copyright (C) 2012, The Linux Box Corporation
3    	 * Copyright (c) 2012-2018 Red Hat, Inc. and/or its affiliates.
4    	 * Contributor : Matt Benjamin <matt@linuxbox.com>
5    	 *               William Allen Simpson <william.allen.simpson@gmail.com>
6    	 *
7    	 * Some portions Copyright CEA/DAM/DIF  (2008)
8    	 * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
9    	 *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
10   	 *
11   	 *
12   	 * This program is free software; you can redistribute it and/or
13   	 * modify it under the terms of the GNU Lesser General Public License
14   	 * as published by the Free Software Foundation; either version 3 of
15   	 * the License, or (at your option) any later version.
16   	 *
17   	 * This program is distributed in the hope that it will be useful, but
18   	 * WITHOUT ANY WARRANTY; without even the implied warranty of
19   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20   	 * Lesser General Public License for more details.
21   	 *
22   	 * You should have received a copy of the GNU Lesser General Public
23   	 * License along with this library; if not, write to the Free Software
24   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
25   	 * 02110-1301 USA
26   	 *
27   	 * -------------
28   	 */
29   	
30   	/**
31   	 * @file nfs_rpc_callback.c
32   	 * @author Matt Benjamin <matt@linuxbox.com>
33   	 * @author Lee Dobryden <lee@linuxbox.com>
34   	 * @author Adam C. Emerson <aemerson@linuxbox.com>
35   	 * @brief RPC callback dispatch package
36   	 *
37   	 * This module implements APIs for submission, and dispatch of NFSv4.0
38   	 * and NFSv4.1 callbacks.
39   	 *
40   	 */
41   	
42   	#include "config.h"
43   	#include <unistd.h>
44   	#include <sys/types.h>
45   	#include <sys/stat.h>
46   	#include <sys/param.h>
47   	#include <time.h>
48   	#include <pthread.h>
49   	#include <assert.h>
50   	#include <arpa/inet.h>
51   	#include "fsal.h"
52   	#include "nfs_core.h"
53   	#include "log.h"
54   	#include "nfs_rpc_callback.h"
55   	#include "nfs4.h"
56   	#ifdef _HAVE_GSSAPI
57   	#include "gss_credcache.h"
58   	#endif /* _HAVE_GSSAPI */
59   	#include "sal_data.h"
60   	#include "sal_functions.h"
61   	#include <misc/timespec.h>
62   	
63   	const struct __netid_nc_table netid_nc_table[9] = {
64   		{
65   		"-", _NC_ERR, 0}, {
66   		"tcp", _NC_TCP, AF_INET}, {
67   		"tcp6", _NC_TCP6, AF_INET6}, {
68   		"rdma", _NC_RDMA, AF_INET}, {
69   		"rdma6", _NC_RDMA6, AF_INET6}, {
70   		"sctp", _NC_SCTP, AF_INET}, {
71   		"sctp6", _NC_SCTP6, AF_INET6}, {
72   		"udp", _NC_UDP, AF_INET}, {
73   		"udp6", _NC_UDP6, AF_INET6},};
74   	
75   	/* retry timeout default to the moon and back */
76   	static const struct timespec tout = { 3, 0 };
77   	
78   	/**
79   	 * @brief Initialize the callback credential cache
80   	 *
81   	 * @param[in] ccache Location of credential cache
82   	 */
83   	
84   	#ifdef _HAVE_GSSAPI
85   	static inline void nfs_rpc_cb_init_ccache(const char *ccache)
86   	{
87   		int code;
88   	
89   		if (mkdir(ccache, 0700) < 0) {
90   			if (errno == EEXIST)
91   				LogEvent(COMPONENT_INIT,
92   					 "Callback creds directory (%s) already exists",
93   					 ccache);
94   			else
95   				LogWarn(COMPONENT_INIT,
96   					"Could not create credential cache directory: %s (%s)",
97   					ccache, strerror(errno));
98   		}
99   	
100  		ccachesearch[0] = nfs_param.krb5_param.ccache_dir;
101  	
102  		code =
103  			gssd_refresh_krb5_machine_credential(nfs_host_name,
104  				NULL, nfs_param.krb5_param.svc.principal);
105  	
106  		if (code)
107  			LogWarn(COMPONENT_INIT,
108  				"gssd_refresh_krb5_machine_credential failed (%d:%d)",
109  				code, errno);
110  	}
111  	#endif /* _HAVE_GSSAPI */
112  	
113  	/**
114  	 * @brief Initialize callback subsystem
115  	 */
116  	void nfs_rpc_cb_pkginit(void)
117  	{
118  	#ifdef _HAVE_GSSAPI
119  		/* ccache */
120  		nfs_rpc_cb_init_ccache(nfs_param.krb5_param.ccache_dir);
121  	
122  		/* sanity check GSSAPI */
123  		if (gssd_check_mechs() != 0)
124  			LogCrit(COMPONENT_INIT,
125  				"sanity check: gssd_check_mechs() failed");
126  	#endif /* _HAVE_GSSAPI */
127  	}
128  	
129  	/**
130  	 * @brief Shutdown callback subsystem
131  	 */
132  	void nfs_rpc_cb_pkgshutdown(void)
133  	{
134  		/* return */
135  	}
136  	
137  	/**
138  	 * @brief Convert a netid label
139  	 *
140  	 * @todo This is automatically redundant, but in fact upstream TI-RPC is
141  	 * not up-to-date with RFC 5665, will fix (Matt)
142  	 *
143  	 * @param[in] netid The netid label dictating the protocol
144  	 *
145  	 * @return The numerical protocol identifier.
146  	 */
147  	
148  	nc_type nfs_netid_to_nc(const char *netid)
149  	{
150  		if (!strcmp(netid, netid_nc_table[_NC_TCP6].netid))
151  			return _NC_TCP6;
152  	
153  		if (!strcmp(netid, netid_nc_table[_NC_TCP].netid))
154  			return _NC_TCP;
155  	
156  		if (!strcmp(netid, netid_nc_table[_NC_UDP6].netid))
157  			return _NC_UDP6;
158  	
159  		if (!strcmp(netid, netid_nc_table[_NC_UDP].netid))
160  			return _NC_UDP;
161  	
162  		if (!strcmp(netid, netid_nc_table[_NC_RDMA6].netid))
163  			return _NC_RDMA6;
164  	
165  		if (!strcmp(netid, netid_nc_table[_NC_RDMA].netid))
166  			return _NC_RDMA;
167  	
168  		if (!strcmp(netid, netid_nc_table[_NC_SCTP6].netid))
169  			return _NC_SCTP6;
170  	
171  		if (!strcmp(netid, netid_nc_table[_NC_SCTP].netid))
172  			return _NC_SCTP;
173  	
174  		return _NC_ERR;
175  	}
176  	
177  	/**
178  	 * @brief Convert string format address to sockaddr
179  	 *
180  	 * This function takes the host.port format used in the NFSv4.0
181  	 * clientaddr4 and converts it to a POSIX sockaddr structure stored in
182  	 * the callback information of the clientid.
183  	 *
184  	 * @param[in,out] clientid The clientid in which to store the sockaddr
185  	 * @param[in]     uaddr    na_r_addr from the clientaddr4
186  	 */
187  	
188  	static inline void setup_client_saddr(nfs_client_id_t *clientid,
189  					      const char *uaddr)
190  	{
191  		char addr_buf[SOCK_NAME_MAX + 1];
192  		uint32_t bytes[11];
193  		int code;
194  	
195  		assert(clientid->cid_minorversion == 0);
196  	
197  		memset(&clientid->cid_cb.v40.cb_addr.ss, 0,
198  		       sizeof(struct sockaddr_storage));
199  	
(1) Event branch_past_initialization: transfer of control bypasses initialization of:
(2) Event caretline: ^
(3) Event name_at_decl_position: variable "sin" (declared at line 210)
(4) Event name_at_decl_position: variable "sin6" (declared at line 241)
200  		switch (clientid->cid_cb.v40.cb_addr.nc) {
201  		case _NC_TCP:
202  		case _NC_RDMA:
203  		case _NC_SCTP:
204  		case _NC_UDP:
205  			/* IPv4 (ws inspired) */
206  			if (sscanf(uaddr, "%u.%u.%u.%u.%u.%u", &bytes[1], &bytes[2],
207  				   &bytes[3], &bytes[4], &bytes[5], &bytes[6]) != 6)
208  				return;
209  	
210  			struct sockaddr_in *sin = ((struct sockaddr_in *)
211  						   &clientid->cid_cb.v40.cb_addr.ss);
212  	
213  			snprintf(addr_buf, sizeof(addr_buf), "%u.%u.%u.%u",
214  				 bytes[1], bytes[2], bytes[3], bytes[4]);
215  	
216  			sin->sin_family = AF_INET;
217  			sin->sin_port = htons((bytes[5] << 8) | bytes[6]);
218  			code = inet_pton(AF_INET, addr_buf, &sin->sin_addr);
219  	
220  			if (code != 1)
221  				LogWarn(COMPONENT_NFS_CB, "inet_pton failed (%d %s)",
222  					code, addr_buf);
223  			else
224  				LogDebug(COMPONENT_NFS_CB,
225  					 "client callback addr:port %s:%d",
226  					 addr_buf, ntohs(sin->sin_port));
227  	
228  			break;
229  	
230  		case _NC_TCP6:
231  		case _NC_RDMA6:
232  		case _NC_SCTP6:
233  		case _NC_UDP6:
234  			/* IPv6 (ws inspired) */
235  			if (sscanf(uaddr, "%2x:%2x:%2x:%2x:%2x:%2x:%2x:%2x.%u.%u",
236  				   &bytes[1], &bytes[2], &bytes[3], &bytes[4],
237  				   &bytes[5], &bytes[6], &bytes[7], &bytes[8],
238  				   &bytes[9], &bytes[10]) != 10)
239  				return;
240  	
241  			struct sockaddr_in6 *sin6 = ((struct sockaddr_in6 *)
242  						     &clientid->cid_cb.v40.cb_addr.ss);
243  	
244  			snprintf(addr_buf, sizeof(addr_buf),
245  				 "%2x:%2x:%2x:%2x:%2x:%2x:%2x:%2x",
246  				 bytes[1], bytes[2], bytes[3], bytes[4], bytes[5],
247  				 bytes[6], bytes[7], bytes[8]);
248  	
249  			code = inet_pton(AF_INET6, addr_buf, &sin6->sin6_addr);
250  			sin6->sin6_port = htons((bytes[9] << 8) | bytes[10]);
251  			sin6->sin6_family = AF_INET6;
252  	
253  			if (code != 1)
254  				LogWarn(COMPONENT_NFS_CB,
255  					"inet_pton failed (%d %s)", code, addr_buf);
256  			else
257  				LogDebug(COMPONENT_NFS_CB,
258  					 "client callback addr:port %s:%d",
259  					 addr_buf, ntohs(sin6->sin6_port));
260  	
261  			break;
262  	
263  		default:
264  			/* unknown netid */
265  			break;
266  		};
267  	}
268  	
269  	/**
270  	 * @brief Set the callback location for an NFSv4.0 clientid
271  	 *
272  	 * @param[in,out] clientid The clientid in which to set the location
273  	 * @param[in]     addr4    The client's supplied callback address
274  	 */
275  	
276  	void nfs_set_client_location(nfs_client_id_t *clientid,
277  				     const clientaddr4 *addr4)
278  	{
279  		assert(clientid->cid_minorversion == 0);
280  		clientid->cid_cb.v40.cb_addr.nc = nfs_netid_to_nc(addr4->r_netid);
281  		strlcpy(clientid->cid_cb.v40.cb_client_r_addr, addr4->r_addr,
282  			SOCK_NAME_MAX);
283  		setup_client_saddr(clientid, clientid->cid_cb.v40.cb_client_r_addr);
284  	}
285  	
286  	/**
287  	 * @brief Get the fd of an NFSv4.0 callback connection
288  	 *
289  	 * @param[in]  clientid The clientid to query
290  	 * @param[out] fd       The file descriptor
291  	 * @param[out] proto    The protocol used on this connection
292  	 *
293  	 * @return 0 or values of errno.
294  	 */
295  	
296  	static inline int32_t nfs_clid_connected_socket(nfs_client_id_t *clientid,
297  							int *fd, int *proto)
298  	{
299  		int domain, sock_type, protocol, sock_size;
300  		int nfd;
301  		int code;
302  	
303  		assert(clientid->cid_minorversion == 0);
304  	
305  		*fd = 0;
306  		*proto = -1;
307  	
308  		switch (clientid->cid_cb.v40.cb_addr.nc) {
309  		case _NC_TCP:
310  		case _NC_TCP6:
311  			sock_type = SOCK_STREAM;
312  			protocol = IPPROTO_TCP;
313  			break;
314  		case _NC_UDP6:
315  		case _NC_UDP:
316  			sock_type = SOCK_DGRAM;
317  			protocol = IPPROTO_UDP;
318  			break;
319  		default:
320  			return EINVAL;
321  		}
322  	
323  		switch (clientid->cid_cb.v40.cb_addr.ss.ss_family) {
324  		case AF_INET:
325  			domain = PF_INET;
326  			sock_size = sizeof(struct sockaddr_in);
327  			break;
328  		case AF_INET6:
329  			domain = PF_INET6;
330  			sock_size = sizeof(struct sockaddr_in6);
331  			break;
332  		default:
333  			return EINVAL;
334  		}
335  	
336  		nfd = socket(domain, sock_type, protocol);
337  		if (nfd < 0) {
338  			code = errno;
339  			LogWarn(COMPONENT_NFS_CB,
340  				"socket failed %d (%s)", code, strerror(code));
341  			return code;
342  		}
343  	
344  		code = connect(nfd,
345  			       (struct sockaddr *)&clientid->cid_cb.v40.cb_addr.ss,
346  			       sock_size);
347  	
348  		if (code < 0) {
349  			code = errno;
350  			LogWarn(COMPONENT_NFS_CB, "connect fail errno %d (%s)",
351  				code, strerror(code));
352  			close(nfd);
353  			return code;
354  		}
355  	
356  		*proto = protocol;
357  		*fd = nfd;
358  	
359  		return code;
360  	}
361  	
362  	/* end refactorable RPC code */
363  	
364  	/**
365  	 * @brief Check if an authentication flavor is supported
366  	 *
367  	 * @param[in] flavor RPC authentication flavor
368  	 *
369  	 * @retval true if supported.
370  	 * @retval false if not.
371  	 */
372  	
373  	static inline bool supported_auth_flavor(int flavor)
374  	{
375  		switch (flavor) {
376  		case RPCSEC_GSS:
377  		case AUTH_SYS:
378  		case AUTH_NONE:
379  			return true;
380  		default:
381  			return false;
382  		};
383  	}
384  	
385  	/**
386  	 * @brief Kerberos OID
387  	 *
388  	 * This value comes from kerberos source, gssapi_krb5.c (Umich).
389  	 */
390  	
391  	#ifdef _HAVE_GSSAPI
392  	gss_OID_desc krb5oid = { 9, "\052\206\110\206\367\022\001\002\002" };
393  	
394  	/**
395  	 * @brief Format a principal name for an RPC call channel
396  	 *
397  	 * @param[in]  chan Call channel
398  	 * @param[out] buf  Buffer to hold formatted name
399  	 * @param[in]  len  Size of buffer
400  	 *
401  	 * @return The principle or NULL.
402  	 */
403  	
404  	static inline char *format_host_principal(rpc_call_channel_t *chan, char *buf,
405  						  size_t len)
406  	{
407  		char addr_buf[SOCK_NAME_MAX + 1];
408  		const char *host = NULL;
409  		void *sin;
410  	
411  		switch (chan->type) {
412  		case RPC_CHAN_V40:
413  			sin = &chan->source.clientid->cid_cb.v40.cb_addr.ss;
414  			break;
415  		default:
416  			return NULL;
417  		}
418  	
419  		switch (((struct sockaddr_in *)sin)->sin_family) {
420  		case AF_INET:
421  			host = inet_ntop(AF_INET,
422  					 &((struct sockaddr_in *)sin)->sin_addr,
423  					 addr_buf, INET_ADDRSTRLEN);
424  			break;
425  	
426  		case AF_INET6:
427  			host = inet_ntop(AF_INET6,
428  					 &((struct sockaddr_in6 *)sin)->sin6_addr,
429  					 addr_buf, INET6_ADDRSTRLEN);
430  			break;
431  	
432  		default:
433  			break;
434  		}
435  	
436  		if (host) {
437  			snprintf(buf, len, "nfs@%s", host);
438  			return buf;
439  		}
440  	
441  		return NULL;
442  	}
443  	#endif /* _HAVE_GSSAPI */
444  	
445  	/**
446  	 * @brief Set up GSS on a callback channel
447  	 *
448  	 * @param[in,out] chan Channel on which to set up GSS
449  	 * @param[in]     cred GSS Credential
450  	 *
451  	 * @return	auth->ah_error; check AUTH_FAILURE or AUTH_SUCCESS.
452  	 */
453  	
454  	#ifdef _HAVE_GSSAPI
455  	static inline AUTH *nfs_rpc_callback_setup_gss(rpc_call_channel_t *chan,
456  						       nfs_client_cred_t *cred)
457  	{
458  		AUTH *result;
459  		char hprinc[MAXPATHLEN + 1];
460  		char *principal = nfs_param.krb5_param.svc.principal;
461  		int32_t code;
462  	
463  		assert(cred->flavor == RPCSEC_GSS);
464  	
465  		/* MUST RFC 3530bis, section 3.3.3 */
466  		chan->gss_sec.svc = cred->auth_union.auth_gss.svc;
467  		chan->gss_sec.qop = cred->auth_union.auth_gss.qop;
468  	
469  		/* the GSSAPI k5 mech needs to find an unexpired credential
470  		 * for nfs/hostname in an accessible k5ccache */
471  		code = gssd_refresh_krb5_machine_credential(nfs_host_name,
472  							    NULL, principal);
473  	
474  		if (code) {
475  			LogWarn(COMPONENT_NFS_CB,
476  				"gssd_refresh_krb5_machine_credential failed (%d:%d)",
477  				code, errno);
478  			goto out_err;
479  		}
480  	
481  		if (!format_host_principal(chan, hprinc, sizeof(hprinc))) {
482  			code = errno;
483  			LogCrit(COMPONENT_NFS_CB, "format_host_principal failed");
484  			goto out_err;
485  		}
486  	
487  		chan->gss_sec.cred = GSS_C_NO_CREDENTIAL;
488  		chan->gss_sec.req_flags = 0;
489  	
490  		if (chan->gss_sec.svc != RPCSEC_GSS_SVC_NONE) {
491  			/* no more lipkey, spkm3 */
492  			chan->gss_sec.mech = (gss_OID) & krb5oid;
493  			chan->gss_sec.req_flags = GSS_C_MUTUAL_FLAG;	/* XXX */
494  			result = authgss_ncreate_default(chan->clnt, hprinc,
495  							 &chan->gss_sec);
496  		} else {
497  			result = authnone_ncreate();
498  		}
499  	
500  		return result;
501  	
502  	out_err:
503  		result = authnone_ncreate_dummy();
504  		result->ah_error.re_status = RPC_SYSTEMERROR;
505  		result->ah_error.re_errno = code;
506  		return result;
507  	}
508  	#endif /* _HAVE_GSSAPI */
509  	
510  	/**
511  	 * @brief Create a channel for an NFSv4.0 client
512  	 *
513  	 * @param[in] clientid Client record
514  	 * @param[in] flags     Currently unused
515  	 *
516  	 * @return Status code.
517  	 */
518  	
519  	int nfs_rpc_create_chan_v40(nfs_client_id_t *clientid, uint32_t flags)
520  	{
521  		rpc_call_channel_t *chan = &clientid->cid_cb.v40.cb_chan;
522  		char *err;
523  		struct netbuf raddr;
524  		int fd;
525  		int proto;
526  		int code;
527  	
528  		assert(!chan->clnt);
529  		assert(clientid->cid_minorversion == 0);
530  	
531  		/* XXX we MUST error RFC 3530bis, sec. 3.3.3 */
532  		if (!supported_auth_flavor(clientid->cid_credential.flavor))
533  			return EINVAL;
534  	
535  		chan->type = RPC_CHAN_V40;
536  		chan->source.clientid = clientid;
537  	
538  		code = nfs_clid_connected_socket(clientid, &fd, &proto);
539  		if (code) {
540  			LogWarn(COMPONENT_NFS_CB, "Failed creating socket");
541  			return code;
542  		}
543  	
544  		raddr.buf = &clientid->cid_cb.v40.cb_addr.ss;
545  	
546  		switch (proto) {
547  		case IPPROTO_TCP:
548  			raddr.maxlen = raddr.len = sizeof(struct sockaddr_in);
549  			chan->clnt = clnt_vc_ncreatef(fd, &raddr,
550  						      clientid->cid_cb.v40.cb_program,
551  						      NFS_CB /* Errata ID: 2291 */,
552  						      0, 0,
553  						      CLNT_CREATE_FLAG_CLOSE |
554  						      CLNT_CREATE_FLAG_CONNECT);
555  			break;
556  		case IPPROTO_UDP:
557  			raddr.maxlen = raddr.len = sizeof(struct sockaddr_in6);
558  			chan->clnt = clnt_dg_ncreatef(fd, &raddr,
559  						      clientid->cid_cb.v40.cb_program,
560  						      NFS_CB /* Errata ID: 2291 */,
561  						      0, 0,
562  						      CLNT_CREATE_FLAG_CLOSE);
563  			break;
564  		default:
565  			break;
566  		}
567  	
568  		if (CLNT_FAILURE(chan->clnt)) {
569  			err = rpc_sperror(&chan->clnt->cl_error, "failed");
570  	
571  			LogDebug(COMPONENT_NFS_CB, "%s", err);
572  			gsh_free(err);
573  			CLNT_DESTROY(chan->clnt);
574  			chan->clnt = NULL;
575  			return EINVAL;
576  		}
577  	
578  		/* channel protection */
579  		switch (clientid->cid_credential.flavor) {
580  	#ifdef _HAVE_GSSAPI
581  		case RPCSEC_GSS:
582  			chan->auth = nfs_rpc_callback_setup_gss(chan,
583  							&clientid->cid_credential);
584  			break;
585  	#endif /* _HAVE_GSSAPI */
586  		case AUTH_SYS:
587  			chan->auth = authunix_ncreate_default();
588  			break;
589  		case AUTH_NONE:
590  			chan->auth = authnone_ncreate();
591  			break;
592  		default:
593  			return EINVAL;
594  		}
595  	
596  		if (AUTH_FAILURE(chan->auth)) {
597  			err = rpc_sperror(&chan->auth->ah_error, "failed");
598  	
599  			LogDebug(COMPONENT_NFS_CB, "%s", err);
600  			gsh_free(err);
601  			AUTH_DESTROY(chan->auth);
602  			chan->auth = NULL;
603  			return EINVAL;
604  		}
605  		return 0;
606  	}
607  	
608  	/**
609  	 * @brief Dispose of a channel
610  	 *
611  	 * The caller should hold the channel mutex.
612  	 *
613  	 * @param[in] chan The channel to dispose of
614  	 */
615  	static void _nfs_rpc_destroy_chan(rpc_call_channel_t *chan)
616  	{
617  		assert(chan);
618  	
619  		/* clean up auth, if any */
620  		if (chan->auth) {
621  			AUTH_DESTROY(chan->auth);
622  			chan->auth = NULL;
623  		}
624  	
625  		/* channel has a dedicated RPC client */
626  		if (chan->clnt) {
627  			/* destroy it */
628  			CLNT_DESTROY(chan->clnt);
629  			chan->clnt = NULL;
630  		}
631  	
632  		chan->last_called = 0;
633  	}
634  	
635  	/**
636  	 * Call the NFSv4 client's CB_NULL procedure.
637  	 *
638  	 * @param[in] chan    Channel on which to call
639  	 * @param[in] timeout Timeout for client call
640  	 * @param[in] locked  True if the channel is already locked
641  	 *
642  	 * @return Client status.
643  	 */
644  	
645  	static enum clnt_stat rpc_cb_null(rpc_call_channel_t *chan, bool locked)
646  	{
647  		struct clnt_req *cc;
648  		enum clnt_stat stat;
649  	
650  		/* XXX TI-RPC does the signal masking */
651  		if (!locked)
652  			PTHREAD_MUTEX_lock(&chan->mtx);
653  	
654  		if (!chan->clnt) {
655  			stat = RPC_INTR;
656  			goto unlock;
657  		}
658  	
659  		cc = gsh_malloc(sizeof(*cc));
660  		clnt_req_fill(cc, chan->clnt, chan->auth, CB_NULL,
661  			      (xdrproc_t) xdr_void, NULL,
662  			      (xdrproc_t) xdr_void, NULL);
663  		stat = clnt_req_setup(cc, tout);
664  		if (stat == RPC_SUCCESS) {
665  			cc->cc_refreshes = 1;
666  			stat = CLNT_CALL_WAIT(cc);
667  		}
668  		clnt_req_release(cc);
669  	
670  		/* If a call fails, we have to assume path down, or equally fatal
671  		 * error.  We may need back-off. */
672  		if (stat != RPC_SUCCESS)
673  			_nfs_rpc_destroy_chan(chan);
674  	
675  	 unlock:
676  		if (!locked)
677  			PTHREAD_MUTEX_unlock(&chan->mtx);
678  	
679  		return stat;
680  	}
681  	
682  	/**
683  	 * @brief Create a channel for an NFSv4.1 session
684  	 *
685  	 * This function creates a channel on an NFSv4.1 session, using the
686  	 * given security parameters.  If a channel already exists, it is
687  	 * removed and replaced.
688  	 *
689  	 * @param[in,out] session       The session on which to create the
690  	 *                              back channel
691  	 * @param[in]     num_sec_parms Length of sec_parms list
692  	 * @param[in]     sec_parms     Allowable security parameters
693  	 *
694  	 * @return 0 or POSIX error code.
695  	 */
696  	
697  	int nfs_rpc_create_chan_v41(SVCXPRT *xprt, nfs41_session_t *session,
698  				    int num_sec_parms, callback_sec_parms4 *sec_parms)
699  	{
700  		rpc_call_channel_t *chan = &session->cb_chan;
701  		char *err;
702  		int i;
703  		int code = 0;
704  		bool authed = false;
705  	
706  		PTHREAD_MUTEX_lock(&chan->mtx);
707  	
708  		if (chan->clnt) {
709  			/* Something better later. */
710  			code = EEXIST;
711  			goto out;
712  		}
713  	
714  		chan->type = RPC_CHAN_V41;
715  		chan->source.session = session;
716  	
717  		assert(xprt);
718  	
719  		if (svc_get_xprt_type(xprt) == XPRT_RDMA) {
720  			LogWarn(COMPONENT_NFS_CB,
721  				"refusing to create back channel over RDMA for now");
722  			code = EINVAL;
723  			goto out;
724  		}
725  	
726  		/* connect an RPC client
727  		 * Use version 1 per errata ID 2291 for RFC 5661
728  		 */
729  		chan->clnt = clnt_vc_ncreate_svc(xprt, session->cb_program,
730  						 NFS_CB /* Errata ID: 2291 */,
731  						 CLNT_CREATE_FLAG_NONE);
732  	
733  		if (CLNT_FAILURE(chan->clnt)) {
734  			err = rpc_sperror(&chan->clnt->cl_error, "failed");
735  	
736  			LogDebug(COMPONENT_NFS_CB, "%s", err);
737  			gsh_free(err);
738  			CLNT_DESTROY(chan->clnt);
739  			chan->clnt = NULL;
740  			code = EINVAL;
741  			goto out;
742  		}
743  	
744  		for (i = 0; i < num_sec_parms; ++i) {
745  			if (sec_parms[i].cb_secflavor == AUTH_NONE) {
746  				chan->auth = authnone_ncreate();
747  				authed = true;
748  				break;
749  			} else if (sec_parms[i].cb_secflavor == AUTH_SYS) {
750  				struct authunix_parms *sys_parms =
751  				    &sec_parms[i].callback_sec_parms4_u.cbsp_sys_cred;
752  	
753  				chan->auth = authunix_ncreate(sys_parms->aup_machname,
754  							      sys_parms->aup_uid,
755  							      sys_parms->aup_gid,
756  							      sys_parms->aup_len,
757  							      sys_parms->aup_gids);
758  				if (AUTH_SUCCESS(chan->auth)) {
759  					authed = true;
760  					break;
761  				}
762  			} else if (sec_parms[i].cb_secflavor == RPCSEC_GSS) {
763  	
764  				/**
765  				 * @todo ACE: Come back later and implement
766  				 * GSS.
767  				 */
768  				continue;
769  			} else {
770  				LogMajor(COMPONENT_NFS_CB,
771  					 "Client sent unknown auth type.");
772  				continue;
773  			}
774  			err = rpc_sperror(&chan->auth->ah_error, "failed");
775  	
776  			LogDebug(COMPONENT_NFS_CB, "%s", err);
777  			gsh_free(err);
778  			AUTH_DESTROY(chan->auth);
779  			chan->auth = NULL;
780  		}
781  	
782  		if (!authed) {
783  			code = EPERM;
784  			LogMajor(COMPONENT_NFS_CB, "No working auth in sec_params.");
785  			goto out;
786  		}
787  	
788  		atomic_set_uint32_t_bits(&session->flags, session_bc_up);
789  	
790  	 out:
791  		if (code != 0) {
792  			LogWarn(COMPONENT_NFS_CB,
793  				"can not create back channel, code %d", code);
794  			if (chan->clnt)
795  				_nfs_rpc_destroy_chan(chan);
796  		}
797  	
798  		PTHREAD_MUTEX_unlock(&chan->mtx);
799  	
800  		return code;
801  	}
802  	
803  	/**
804  	 * @brief Get a backchannel for a clientid
805  	 *
806  	 * This function works for both NFSv4.0 and NFSv4.1.  For NFSv4.0, if
807  	 * the channel isn't up, it tries to create it.
808  	 *
809  	 * @param[in,out] clientid The clientid to use
810  	 * @param[out]    flags    Unused
811  	 *
812  	 * @return The back channel or NULL if none existed or could be
813  	 *         established.
814  	 */
815  	rpc_call_channel_t *nfs_rpc_get_chan(nfs_client_id_t *clientid, uint32_t flags)
816  	{
817  		rpc_call_channel_t *chan;
818  		struct glist_head *glist;
819  		nfs41_session_t *session;
820  	
821  		if (clientid->cid_minorversion == 0) {
822  			chan = &clientid->cid_cb.v40.cb_chan;
823  			if (!chan->clnt) {
824  				if (nfs_rpc_create_chan_v40(clientid, flags)) {
825  					chan = NULL;
826  				}
827  			}
828  			return chan;
829  		}
830  	
831  		/* Get the first working back channel we have */
832  		chan = NULL;
833  		pthread_mutex_lock(&clientid->cid_mutex);
834  		glist_for_each(glist, &clientid->cid_cb.v41.cb_session_list) {
835  			session = glist_entry(glist, nfs41_session_t, session_link);
836  			if (atomic_fetch_uint32_t(&session->flags) & session_bc_up) {
837  				chan = &session->cb_chan;
838  				break;
839  			}
840  		}
841  		pthread_mutex_unlock(&clientid->cid_mutex);
842  	
843  		return chan;
844  	}
845  	
846  	/**
847  	 * @brief Dispose of a channel
848  	 *
849  	 * @param[in] chan The channel to dispose of
850  	 */
851  	void nfs_rpc_destroy_chan(rpc_call_channel_t *chan)
852  	{
853  		assert(chan);
854  	
855  		PTHREAD_MUTEX_lock(&chan->mtx);
856  	
857  		_nfs_rpc_destroy_chan(chan);
858  	
859  		PTHREAD_MUTEX_unlock(&chan->mtx);
860  	}
861  	
862  	/**
863  	 * @brief Free callback arguments
864  	 *
865  	 * @param[in] op The argop to free
866  	 */
867  	
868  	static inline void free_argop(nfs_cb_argop4 *op)
869  	{
870  		gsh_free(op);
871  	}
872  	
873  	/**
874  	 * @brief Free callback result
875  	 *
876  	 * @param[in] op The resop to free
877  	 */
878  	
879  	static inline void free_resop(nfs_cb_resop4 *op)
880  	{
881  		gsh_free(op);
882  	}
883  	
884  	/**
885  	 * @brief Allocate an RPC call
886  	 *
887  	 * @return The newly allocated call or NULL.
888  	 */
889  	
890  	struct _rpc_call *alloc_rpc_call(void)
891  	{
892  		struct _rpc_call *call = gsh_calloc(1, sizeof(struct _rpc_call));
893  	
894  		(void) atomic_inc_uint64_t(&nfs_health_.enqueued_reqs);
895  	
896  		return call;
897  	}
898  	
899  	/**
900  	 * @brief Free an RPC call
901  	 *
902  	 * @param[in] call The call to free
903  	 */
904  	void free_rpc_call(rpc_call_t *call)
905  	{
906  		free_argop(call->cbt.v_u.v4.args.argarray.argarray_val);
907  		free_resop(call->cbt.v_u.v4.res.resarray.resarray_val);
908  	
909  		clnt_req_release(&call->call_req);
910  	}
911  	
912  	/**
913  	 * @brief Free the RPC call context
914  	 *
915  	 * @param[in] cc The call context to free
916  	 */
917  	static void nfs_rpc_call_free(struct clnt_req *cc, size_t unused)
918  	{
919  		rpc_call_t *call = container_of(cc, struct _rpc_call, call_req);
920  	
921  		gsh_free(call);
922  		(void) atomic_inc_uint64_t(&nfs_health_.dequeued_reqs);
923  	}
924  	
925  	/**
926  	 * @brief Call response processing
927  	 *
928  	 * @param[in] cc  The RPC call request context
929  	 */
930  	static void nfs_rpc_call_process(struct clnt_req *cc)
931  	{
932  		rpc_call_t *call = container_of(cc, rpc_call_t, call_req);
933  	
934  		/* always TCP for retries, cc_refreshes only for AUTH_REFRESH()
935  		 */
936  		if (cc->cc_error.re_status == RPC_AUTHERROR
937  		 && cc->cc_refreshes-- > 0
938  		 && AUTH_REFRESH(cc->cc_auth, NULL)) {
939  			if (clnt_req_refresh(cc) == RPC_SUCCESS) {
940  				cc->cc_error.re_status = CLNT_CALL_BACK(cc);
941  				return;
942  			}
943  		}
944  	
945  		call->states |= NFS_CB_CALL_FINISHED;
946  	
947  		if (call->call_hook)
948  			call->call_hook(call);
949  	
950  		free_rpc_call(call);
951  	}
952  	
953  	/**
954  	 * @brief Dispatch a call
955  	 *
956  	 * @param[in,out] call  The call to dispatch
957  	 * @param[in]     flags Flags governing call
958  	 *
959  	 * @return enum clnt_stat.
960  	 */
961  	
962  	enum clnt_stat nfs_rpc_call(rpc_call_t *call, uint32_t flags)
963  	{
964  		struct clnt_req *cc = &call->call_req;
965  	
966  		call->states = NFS_CB_CALL_DISPATCH;
967  	
968  		/* XXX TI-RPC does the signal masking */
969  		PTHREAD_MUTEX_lock(&call->chan->mtx);
970  	
971  		clnt_req_fill(cc, call->chan->clnt, call->chan->auth, CB_COMPOUND,
972  			      (xdrproc_t) xdr_CB_COMPOUND4args, &call->cbt.v_u.v4.args,
973  			      (xdrproc_t) xdr_CB_COMPOUND4res, &call->cbt.v_u.v4.res);
974  		cc->cc_size = sizeof(nfs_request_t);
975  		cc->cc_free_cb = nfs_rpc_call_free;
976  	
977  		if (!call->chan->clnt) {
978  			cc->cc_error.re_status = RPC_INTR;
979  			goto unlock;
980  		}
981  		if (clnt_req_setup(cc, tout) == RPC_SUCCESS) {
982  			cc->cc_process_cb = nfs_rpc_call_process;
983  			cc->cc_error.re_status = CLNT_CALL_BACK(cc);
984  		}
985  	
986  		/* If a call fails, we have to assume path down, or equally fatal
987  		 * error.  We may need back-off. */
988  		if (cc->cc_error.re_status != RPC_SUCCESS) {
989  			_nfs_rpc_destroy_chan(call->chan);
990  			call->states |= NFS_CB_CALL_ABORTED;
991  		}
992  	
993  	 unlock:
994  		PTHREAD_MUTEX_unlock(&call->chan->mtx);
995  	
996  		/* any broadcast or signalling done in completion function */
997  		return cc->cc_error.re_status;
998  	}
999  	
1000 	/**
1001 	 * @brief Abort a call
1002 	 *
1003 	 * @param[in] call The call to abort
1004 	 *
1005 	 * @todo function doesn't seem to do anything.
1006 	 *
1007 	 * @return But it does it successfully.
1008 	 */
1009 	
1010 	int32_t nfs_rpc_abort_call(rpc_call_t *call)
1011 	{
1012 		return 0;
1013 	}
1014 	
1015 	/**
1016 	 * @brief Construct a CB_COMPOUND for v41
1017 	 *
1018 	 * This function constructs a compound with a CB_SEQUENCE and one
1019 	 * other operation.
1020 	 *
1021 	 * @param[in] session Session on whose back channel we make the call
1022 	 * @param[in] op      The operation to add
1023 	 * @param[in] refer   Referral data, NULL if none
1024 	 * @param[in] slot    Slot number to use
1025 	 *
1026 	 * @return The constructed call or NULL.
1027 	 */
1028 	static rpc_call_t *construct_v41(nfs41_session_t *session,
1029 					 nfs_cb_argop4 *op,
1030 					 struct state_refer *refer,
1031 					 slotid4 slot, slotid4 highest_slot)
1032 	{
1033 		rpc_call_t *call = alloc_rpc_call();
1034 		nfs_cb_argop4 sequenceop;
1035 		CB_SEQUENCE4args *sequence = &sequenceop.nfs_cb_argop4_u.opcbsequence;
1036 		const uint32_t minor = session->clientid_record->cid_minorversion;
1037 	
1038 		call->chan = &session->cb_chan;
1039 		cb_compound_init_v4(&call->cbt, 2, minor, 0, NULL, 0);
1040 	
1041 		memset(sequence, 0, sizeof(CB_SEQUENCE4args));
1042 		sequenceop.argop = NFS4_OP_CB_SEQUENCE;
1043 	
1044 		memcpy(sequence->csa_sessionid, session->session_id,
1045 		       NFS4_SESSIONID_SIZE);
1046 		sequence->csa_sequenceid = session->bc_slots[slot].sequence;
1047 		sequence->csa_slotid = slot;
1048 		sequence->csa_highest_slotid = highest_slot;
1049 		sequence->csa_cachethis = false;
1050 	
1051 		if (refer) {
1052 			referring_call_list4 *list;
1053 			referring_call4 *ref_call = NULL;
1054 	
1055 			list = gsh_calloc(1, sizeof(referring_call_list4));
1056 	
1057 			ref_call = gsh_malloc(sizeof(referring_call4));
1058 	
1059 			sequence->csa_referring_call_lists.csarcl_len = 1;
1060 			sequence->csa_referring_call_lists.csarcl_val = list;
1061 			memcpy(list->rcl_sessionid, refer->session,
1062 			       sizeof(NFS4_SESSIONID_SIZE));
1063 			list->rcl_referring_calls.rcl_referring_calls_len = 1;
1064 			list->rcl_referring_calls.rcl_referring_calls_val = ref_call;
1065 			ref_call->rc_sequenceid = refer->sequence;
1066 			ref_call->rc_slotid = refer->slot;
1067 		} else {
1068 			sequence->csa_referring_call_lists.csarcl_len = 0;
1069 			sequence->csa_referring_call_lists.csarcl_val = NULL;
1070 		}
1071 		cb_compound_add_op(&call->cbt, &sequenceop);
1072 		cb_compound_add_op(&call->cbt, op);
1073 	
1074 		return call;
1075 	}
1076 	
1077 	/**
1078 	 * @brief Free a CB sequence for v41
1079 	 *
1080 	 * @param[in] call The call to free
1081 	 */
1082 	static void release_v41(rpc_call_t *call)
1083 	{
1084 		nfs_cb_argop4 *argarray_val =
1085 			call->cbt.v_u.v4.args.argarray.argarray_val;
1086 		CB_SEQUENCE4args *sequence =
1087 			&argarray_val[0].nfs_cb_argop4_u.opcbsequence;
1088 		referring_call_list4 *call_lists =
1089 			sequence->csa_referring_call_lists.csarcl_val;
1090 	
1091 		if (call_lists == NULL)
1092 			return;
1093 	
1094 		gsh_free(call_lists->rcl_referring_calls.rcl_referring_calls_val);
1095 		gsh_free(call_lists);
1096 	}
1097 	
1098 	/**
1099 	 * @brief Find a callback slot
1100 	 *
1101 	 * Find and reserve a slot, if we can.  If @c wait is set to true, we
1102 	 * wait on the condition variable for a limited time.
1103 	 *
1104 	 * @param[in,out] session      Sesson on which to operate
1105 	 * @param[in]     wait         Whether to wait on the condition variable if
1106 	 *                             no slot can be found
1107 	 * @param[out]    slot         Slot to use
1108 	 * @param[out]    highest_slot Highest slot in use
1109 	 *
1110 	 * @retval false if a slot was not found.
1111 	 * @retval true if a slot was found.
1112 	 */
1113 	static bool find_cb_slot(nfs41_session_t *session, bool wait, slotid4 *slot,
1114 				 slotid4 *highest_slot)
1115 	{
1116 		slotid4 cur = 0;
1117 		bool found = false;
1118 	
1119 		PTHREAD_MUTEX_lock(&session->cb_mutex);
1120 	 retry:
1121 		for (cur = 0;
1122 		     cur < MIN(session->back_channel_attrs.ca_maxrequests,
1123 			       session->nb_slots);
1124 		     ++cur) {
1125 	
1126 			if (!(session->bc_slots[cur].in_use) && (!found)) {
1127 				found = true;
1128 				*slot = cur;
1129 				*highest_slot = cur;
1130 			}
1131 	
1132 			if (session->bc_slots[cur].in_use)
1133 				*highest_slot = cur;
1134 		}
1135 	
1136 		if (!found && wait) {
1137 			struct timespec ts;
1138 			bool woke = false;
1139 	
1140 			clock_gettime(CLOCK_REALTIME, &ts);
1141 			timespec_addms(&ts, 100);
1142 	
1143 			woke = (pthread_cond_timedwait(&session->cb_cond,
1144 						       &session->cb_mutex,
1145 						       &ts) != ETIMEDOUT);
1146 			if (woke) {
1147 				wait = false;
1148 				goto retry;
1149 			}
1150 		}
1151 	
1152 		if (found) {
1153 			session->bc_slots[*slot].in_use = true;
1154 			++session->bc_slots[*slot].sequence;
1155 			assert(*slot < session->back_channel_attrs.ca_maxrequests);
1156 		}
1157 	
1158 		PTHREAD_MUTEX_unlock(&session->cb_mutex);
1159 		return found;
1160 	}
1161 	
1162 	/**
1163 	 * @brief Release a reserved callback slot and wake waiters
1164 	 *
1165 	 * @param[in,out] session Session holding slot to release
1166 	 * @param[in]     slot    Slot to release
1167 	 * @param[in]     bool    Whether the operation was ever sent
1168 	 */
1169 	
1170 	static void release_cb_slot(nfs41_session_t *session, slotid4 slot, bool sent)
1171 	{
1172 		PTHREAD_MUTEX_lock(&session->cb_mutex);
1173 		session->bc_slots[slot].in_use = false;
1174 		if (!sent)
1175 			--session->bc_slots[slot].sequence;
1176 		pthread_cond_broadcast(&session->cb_cond);
1177 		PTHREAD_MUTEX_unlock(&session->cb_mutex);
1178 	}
1179 	
1180 	static int nfs_rpc_v41_single(nfs_client_id_t *clientid, nfs_cb_argop4 *op,
1181 			       struct state_refer *refer,
1182 			       void (*completion)(rpc_call_t *),
1183 			       void *completion_arg)
1184 	{
1185 		struct glist_head *glist;
1186 		int ret = ENOTCONN;
1187 		bool wait = false;
1188 	
1189 	restart:
1190 		pthread_mutex_lock(&clientid->cid_mutex);
1191 		glist_for_each(glist, &clientid->cid_cb.v41.cb_session_list) {
1192 			nfs41_session_t *scur, *session;
1193 			slotid4 slot = 0;
1194 			slotid4 highest_slot = 0;
1195 			rpc_call_t *call = NULL;
1196 	
1197 			scur = glist_entry(glist, nfs41_session_t, session_link);
1198 	
1199 			/*
1200 			 * This is part of the infinite loop avoidance. When we
1201 			 * attempt to use a session and that fails, we clear the
1202 			 * session_bc_up flag.  Then, we can avoid that session until
1203 			 * the backchannel has been reestablished.
1204 			 */
1205 			if (!(atomic_fetch_uint32_t(&scur->flags) & session_bc_up)) {
1206 				LogDebug(COMPONENT_NFS_CB, "bc is down");
1207 				continue;
1208 			}
1209 	
1210 			/*
1211 			 * We get a slot before we try to get a reference to the
1212 			 * session, which is odd, but necessary, as we can't hold
1213 			 * the cid_mutex when we go to put the session reference.
1214 			 */
1215 			if (!(find_cb_slot(scur, wait, &slot, &highest_slot))) {
1216 				LogDebug(COMPONENT_NFS_CB, "can't get slot");
1217 				continue;
1218 			}
1219 	
1220 			/*
1221 			 * Get a reference to the session.
1222 			 *
1223 			 * @todo: We don't really need to do the hashtable lookup
1224 			 * here since we have a pointer, but it's currently the only
1225 			 * safe way to get a reference.
1226 			 */
1227 			if (!nfs41_Session_Get_Pointer(scur->session_id, &session)) {
1228 				release_cb_slot(scur, slot, false);
1229 				continue;
1230 			}
1231 	
1232 			assert(session == scur);
1233 	
1234 			/* Drop mutex since we have a session ref */
1235 			pthread_mutex_unlock(&clientid->cid_mutex);
1236 	
1237 			call = construct_v41(session, op, refer, slot, highest_slot);
1238 	
1239 			call->call_hook = completion;
1240 			call->call_arg = completion_arg;
1241 			ret = nfs_rpc_call(call, NFS_RPC_CALL_NONE);
1242 			if (ret == 0)
1243 				return 0;
1244 	
1245 			/*
1246 			 * Tear down channel since there is likely something
1247 			 * wrong with it.
1248 			 */
1249 			LogDebug(COMPONENT_NFS_CB, "nfs_rpc_call failed: %d",
1250 					ret);
1251 			atomic_clear_uint32_t_bits(&session->flags, session_bc_up);
1252 	
1253 			release_v41(call);
1254 			free_rpc_call(call);
1255 	
1256 			release_cb_slot(session, slot, false);
1257 			dec_session_ref(session);
1258 			goto restart;
1259 		}
1260 		pthread_mutex_unlock(&clientid->cid_mutex);
1261 	
1262 		/* If it didn't work, then try again and wait on a slot */
1263 		if (ret && !wait) {
1264 			wait = true;
1265 			goto restart;
1266 		}
1267 	
1268 		return ret;
1269 	}
1270 	
1271 	/**
1272 	 * @brief Free information associated with any 'single' call
1273 	 */
1274 	
1275 	void nfs41_release_single(rpc_call_t *call)
1276 	{
1277 		release_cb_slot(call->chan->source.session,
1278 				call->cbt.v_u.v4.args.argarray.argarray_val[0]
1279 				.nfs_cb_argop4_u.opcbsequence.csa_slotid, true);
1280 		dec_session_ref(call->chan->source.session);
1281 		release_v41(call);
1282 	}
1283 	
1284 	/**
1285 	 * @brief test the state of callback channel for a clientid using NULL.
1286 	 * @return  enum clnt_stat
1287 	 */
1288 	
1289 	enum clnt_stat nfs_test_cb_chan(nfs_client_id_t *clientid)
1290 	{
1291 		rpc_call_channel_t *chan;
1292 		enum clnt_stat stat;
1293 		int retries = 1;
1294 	
1295 		/* create (fix?) channel */
1296 		do {
1297 			chan = nfs_rpc_get_chan(clientid, NFS_RPC_FLAG_NONE);
1298 			if (!chan) {
1299 				LogCrit(COMPONENT_NFS_CB, "nfs_rpc_get_chan failed");
1300 				return RPC_SYSTEMERROR;
1301 			}
1302 	
1303 			if (!chan->clnt) {
1304 				LogCrit(COMPONENT_NFS_CB,
1305 					"nfs_rpc_get_chan failed (no clnt)");
1306 				return RPC_SYSTEMERROR;
1307 			}
1308 	
1309 			if (!chan->auth) {
1310 				LogCrit(COMPONENT_NFS_CB,
1311 					"nfs_rpc_get_chan failed (no auth)");
1312 				return RPC_SYSTEMERROR;
1313 			}
1314 	
1315 			/* try the CB_NULL proc -- inline here, should be ok-ish */
1316 			stat = rpc_cb_null(chan, false);
1317 			LogDebug(COMPONENT_NFS_CB,
1318 				"rpc_cb_null on client %p returns %d", clientid, stat);
1319 	
1320 			/* RPC_INTR indicates that we should refresh the
1321 			 * channel and retry */
1322 		} while (stat == RPC_INTR && retries-- > 0);
1323 	
1324 		return stat;
1325 	}
1326 	
1327 	static int nfs_rpc_v40_single(nfs_client_id_t *clientid, nfs_cb_argop4 *op,
1328 			       void (*completion)(rpc_call_t *),
1329 			       void *completion_arg)
1330 	{
1331 		rpc_call_channel_t *chan;
1332 		rpc_call_t *call;
1333 		int rc;
1334 	
1335 		/* Attempt a recall only if channel state is UP */
1336 		if (get_cb_chan_down(clientid)) {
1337 			LogCrit(COMPONENT_NFS_CB,
1338 				"Call back channel down, not issuing a recall");
1339 			return ENOTCONN;
1340 		}
1341 	
1342 		chan = nfs_rpc_get_chan(clientid, NFS_RPC_FLAG_NONE);
1343 		if (!chan) {
1344 			LogCrit(COMPONENT_NFS_CB, "nfs_rpc_get_chan failed");
1345 			/* TODO: move this to nfs_rpc_get_chan ? */
1346 			set_cb_chan_down(clientid, true);
1347 			return ENOTCONN;
1348 		}
1349 		if (!chan->clnt) {
1350 			LogCrit(COMPONENT_NFS_CB, "nfs_rpc_get_chan failed (no clnt)");
1351 			set_cb_chan_down(clientid, true);
1352 			return ENOTCONN;
1353 		}
1354 		if (!chan->auth) {
1355 			LogCrit(COMPONENT_NFS_CB, "nfs_rpc_get_chan failed (no auth)");
1356 			set_cb_chan_down(clientid, true);
1357 			return ENOTCONN;
1358 		}
1359 	
1360 		call = alloc_rpc_call();
1361 		call->chan = chan;
1362 		cb_compound_init_v4(&call->cbt, 1, 0,
1363 				    clientid->cid_cb.v40.cb_callback_ident, NULL, 0);
1364 		cb_compound_add_op(&call->cbt, op);
1365 		call->call_hook = completion;
1366 		call->call_arg = completion_arg;
1367 	
1368 		rc = nfs_rpc_call(call, NFS_RPC_CALL_NONE);
1369 		if (rc)
1370 			free_rpc_call(call);
1371 		return rc;
1372 	}
1373 	
1374 	/**
1375 	 * @brief Send CB_COMPOUND with a single operation
1376 	 *
1377 	 * In the case of v4.1+, this actually sends two opearations, a CB_SEQUENCE
1378 	 * and the supplied operation.  It works as a convenience function to handle
1379 	 * the details of callback management, finding a connection with a working
1380 	 * back channel, and so forth.
1381 	 *
1382 	 * @note This should work for most practical purposes, but is not
1383 	 * ideal.  What we ought to have is a per-clientid queue that
1384 	 * operations can be submitted to that will be sent when a
1385 	 * back-channel is re-established, with a per-session queue for
1386 	 * operations that were sent but had the back-channel fail before the
1387 	 * response was received.
1388 	 *
1389 	 * @param[in] clientid       Client record
1390 	 * @param[in] op             The operation to perform
1391 	 * @param[in] refer          Referral tracking info (or NULL)
1392 	 * @param[in] completion     Completion function for this operation
1393 	 * @param[in] c_arg          Argument provided to completion hook
1394 	 *
1395 	 * @return POSIX error codes.
1396 	 */
1397 	int nfs_rpc_cb_single(nfs_client_id_t *clientid, nfs_cb_argop4 *op,
1398 			       struct state_refer *refer,
1399 			       void (*completion)(rpc_call_t *),
1400 			       void *c_arg)
1401 	{
1402 		if (clientid->cid_minorversion == 0)
1403 			return nfs_rpc_v40_single(clientid, op, completion, c_arg);
1404 		return nfs_rpc_v41_single(clientid, op, refer, completion, c_arg);
1405 	}
1406