1    	/*
2    	 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Copyright (C) Max Matveev, 2012
5    	 * Copyright CEA/DAM/DIF  (2008)
6    	 *
7    	 * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
8    	 *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
9    	 *
10   	 * This program is free software; you can redistribute it and/or
11   	 * modify it under the terms of the GNU Lesser General Public
12   	 * License as published by the Free Software Foundation; either
13   	 * version 3 of the License, or (at your option) any later version.
14   	 *
15   	 * This program is distributed in the hope that it will be useful,
16   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18   	 * Lesser General Public License for more details.
19   	 *
20   	 * You should have received a copy of the GNU Lesser General Public
21   	 * License along with this library; if not, write to the Free Software
22   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23   	 * 02110-1301 USA
24   	 */
25   	
26   	/* Proxy handle methods */
27   	
28   	#include "config.h"
29   	
30   	#include "fsal.h"
31   	#include <assert.h>
32   	#include <pthread.h>
33   	#include <arpa/inet.h>
34   	#include <sys/poll.h>
35   	#include <netdb.h>
36   	#include <urcu-bp.h>
37   	
38   	#include "gsh_list.h"
39   	#include "abstract_atomic.h"
40   	#include "fsal_types.h"
41   	#include "FSAL/fsal_commonlib.h"
42   	#include "pxy_fsal_methods.h"
43   	#include "fsal_nfsv4_macros.h"
44   	#include "nfs_core.h"
45   	#include "nfs_proto_functions.h"
46   	#include "nfs_proto_tools.h"
47   	#include "export_mgr.h"
48   	#include "common_utils.h"
49   	
50   	#define FSAL_PROXY_NFS_V4 4
51   	#define FSAL_PROXY_NFS_V4_MINOR 1
52   	#define NB_RPC_SLOT 16
53   	#define NB_MAX_OPERATIONS 10
54   	
55   	/* NB! nfs_prog is just an easy way to get this info into the call
56   	 *     It should really be fetched via export pointer */
57   	/**
58   	 * We mutualize rpc_context and slot NFSv4.1.
59   	 */
60   	struct pxy_rpc_io_context {
61   		pthread_mutex_t iolock;
62   		pthread_cond_t iowait;
63   		struct glist_head calls;
64   		uint32_t rpc_xid;
65   		bool iodone;
66   		int ioresult;
67   		unsigned int nfs_prog;
68   		unsigned int sendbuf_sz;
69   		unsigned int recvbuf_sz;
70   		char *sendbuf;
71   		char *recvbuf;
72   		slotid4 slotid;
73   		sequenceid4 seqid;
74   	};
75   	
76   	/* Use this to estimate storage requirements for fattr4 blob */
77   	struct pxy_fattr_storage {
78   		fattr4_type type;
79   		fattr4_change change_time;
80   		fattr4_size size;
81   		fattr4_fsid fsid;
82   		fattr4_filehandle filehandle;
83   		fattr4_fileid fileid;
84   		fattr4_mode mode;
85   		fattr4_numlinks numlinks;
86   		fattr4_owner owner;
87   		fattr4_owner_group owner_group;
88   		fattr4_space_used space_used;
89   		fattr4_time_access time_access;
90   		fattr4_time_metadata time_metadata;
91   		fattr4_time_modify time_modify;
92   		fattr4_rawdev rawdev;
93   		char padowner[MAXNAMLEN + 1];
94   		char padgroup[MAXNAMLEN + 1];
95   		char padfh[NFS4_FHSIZE];
96   	};
97   	
98   	#define FATTR_BLOB_SZ sizeof(struct pxy_fattr_storage)
99   	
100  	/*
101  	 * This is what becomes an opaque FSAL handle for the upper layers.
102  	 *
103  	 * The type is a placeholder for future expansion.
104  	 */
105  	struct pxy_handle_blob {
106  		uint8_t len;
107  		uint8_t type;
108  		uint8_t bytes[0];
109  	};
110  	
111  	struct pxy_obj_handle {
112  		struct fsal_obj_handle obj;
113  		nfs_fh4 fh4;
114  	#ifdef PROXY_HANDLE_MAPPING
115  		nfs23_map_handle_t h23;
116  	#endif
117  		fsal_openflags_t openflags;
118  		struct pxy_handle_blob blob;
119  	};
120  	
121  	static struct pxy_obj_handle *pxy_alloc_handle(struct fsal_export *exp,
122  						       const nfs_fh4 *fh,
123  						       fattr4 *obj_attributes,
124  						       struct attrlist *attrs_out);
125  	
126  	struct pxy_state {
127  		struct state_t state;
128  		stateid4 stateid;
129  	};
130  	
131  	struct state_t *pxy_alloc_state(struct fsal_export *exp_hdl,
132  					enum state_type state_type,
133  					struct state_t *related_state)
134  	{
135  		return init_state(gsh_calloc(1, sizeof(struct pxy_state)), exp_hdl,
136  				   state_type, related_state);
137  	}
138  	
139  	void pxy_free_state(struct fsal_export *exp_hdl, struct state_t *state)
140  	{
141  		struct pxy_state *pxy_state_id = container_of(state, struct pxy_state,
142  							      state);
143  	
144  		gsh_free(pxy_state_id);
145  	}
146  	
147  	#define FSAL_VERIFIER_T_TO_VERIFIER4(verif4, fsal_verif)		\
148  	do { \
149  		BUILD_BUG_ON(sizeof(fsal_verifier_t) != sizeof(verifier4));	\
150  		memcpy(verif4, fsal_verif, sizeof(fsal_verifier_t));		\
151  	} while (0)
152  	
153  	static fsal_status_t nfsstat4_to_fsal(nfsstat4 nfsstatus)
154  	{
155  		switch (nfsstatus) {
156  		case NFS4ERR_SAME:
157  		case NFS4ERR_NOT_SAME:
158  		case NFS4_OK:
159  			return fsalstat(ERR_FSAL_NO_ERROR, (int)nfsstatus);
160  		case NFS4ERR_PERM:
161  			return fsalstat(ERR_FSAL_PERM, (int)nfsstatus);
162  		case NFS4ERR_NOENT:
163  			return fsalstat(ERR_FSAL_NOENT, (int)nfsstatus);
164  		case NFS4ERR_IO:
165  			return fsalstat(ERR_FSAL_IO, (int)nfsstatus);
166  		case NFS4ERR_NXIO:
167  			return fsalstat(ERR_FSAL_NXIO, (int)nfsstatus);
168  		case NFS4ERR_EXPIRED:
169  		case NFS4ERR_LOCKED:
170  		case NFS4ERR_SHARE_DENIED:
171  		case NFS4ERR_LOCK_RANGE:
172  		case NFS4ERR_OPENMODE:
173  		case NFS4ERR_FILE_OPEN:
174  		case NFS4ERR_ACCESS:
175  		case NFS4ERR_DENIED:
176  			return fsalstat(ERR_FSAL_ACCESS, (int)nfsstatus);
177  		case NFS4ERR_EXIST:
178  			return fsalstat(ERR_FSAL_EXIST, (int)nfsstatus);
179  		case NFS4ERR_XDEV:
180  			return fsalstat(ERR_FSAL_XDEV, (int)nfsstatus);
181  		case NFS4ERR_NOTDIR:
182  			return fsalstat(ERR_FSAL_NOTDIR, (int)nfsstatus);
183  		case NFS4ERR_ISDIR:
184  			return fsalstat(ERR_FSAL_ISDIR, (int)nfsstatus);
185  		case NFS4ERR_FBIG:
186  			return fsalstat(ERR_FSAL_FBIG, 0);
187  		case NFS4ERR_NOSPC:
188  			return fsalstat(ERR_FSAL_NOSPC, (int)nfsstatus);
189  		case NFS4ERR_ROFS:
190  			return fsalstat(ERR_FSAL_ROFS, (int)nfsstatus);
191  		case NFS4ERR_MLINK:
192  			return fsalstat(ERR_FSAL_MLINK, (int)nfsstatus);
193  		case NFS4ERR_NAMETOOLONG:
194  			return fsalstat(ERR_FSAL_NAMETOOLONG, (int)nfsstatus);
195  		case NFS4ERR_NOTEMPTY:
196  			return fsalstat(ERR_FSAL_NOTEMPTY, (int)nfsstatus);
197  		case NFS4ERR_DQUOT:
198  			return fsalstat(ERR_FSAL_DQUOT, (int)nfsstatus);
199  		case NFS4ERR_STALE:
200  			return fsalstat(ERR_FSAL_STALE, (int)nfsstatus);
201  		case NFS4ERR_NOFILEHANDLE:
202  		case NFS4ERR_BADHANDLE:
203  			return fsalstat(ERR_FSAL_BADHANDLE, (int)nfsstatus);
204  		case NFS4ERR_BAD_COOKIE:
205  			return fsalstat(ERR_FSAL_BADCOOKIE, (int)nfsstatus);
206  		case NFS4ERR_NOTSUPP:
207  			return fsalstat(ERR_FSAL_NOTSUPP, (int)nfsstatus);
208  		case NFS4ERR_TOOSMALL:
209  			return fsalstat(ERR_FSAL_TOOSMALL, (int)nfsstatus);
210  		case NFS4ERR_SERVERFAULT:
211  			return fsalstat(ERR_FSAL_SERVERFAULT, (int)nfsstatus);
212  		case NFS4ERR_BADTYPE:
213  			return fsalstat(ERR_FSAL_BADTYPE, (int)nfsstatus);
214  		case NFS4ERR_GRACE:
215  		case NFS4ERR_DELAY:
216  			return fsalstat(ERR_FSAL_DELAY, (int)nfsstatus);
217  		case NFS4ERR_FHEXPIRED:
218  			return fsalstat(ERR_FSAL_FHEXPIRED, (int)nfsstatus);
219  		case NFS4ERR_WRONGSEC:
220  			return fsalstat(ERR_FSAL_SEC, (int)nfsstatus);
221  		case NFS4ERR_SYMLINK:
222  			return fsalstat(ERR_FSAL_SYMLINK, (int)nfsstatus);
223  		case NFS4ERR_ATTRNOTSUPP:
224  			return fsalstat(ERR_FSAL_ATTRNOTSUPP, (int)nfsstatus);
225  		case NFS4ERR_BADNAME:
226  			return fsalstat(ERR_FSAL_BADNAME, (int)nfsstatus);
227  		case NFS4ERR_INVAL:
228  		case NFS4ERR_CLID_INUSE:
229  		case NFS4ERR_MOVED:
230  		case NFS4ERR_RESOURCE:
231  		case NFS4ERR_MINOR_VERS_MISMATCH:
232  		case NFS4ERR_STALE_CLIENTID:
233  		case NFS4ERR_STALE_STATEID:
234  		case NFS4ERR_OLD_STATEID:
235  		case NFS4ERR_BAD_STATEID:
236  		case NFS4ERR_BAD_SEQID:
237  		case NFS4ERR_RESTOREFH:
238  		case NFS4ERR_LEASE_MOVED:
239  		case NFS4ERR_NO_GRACE:
240  		case NFS4ERR_RECLAIM_BAD:
241  		case NFS4ERR_RECLAIM_CONFLICT:
242  		case NFS4ERR_BADXDR:
243  		case NFS4ERR_BADCHAR:
244  		case NFS4ERR_BAD_RANGE:
245  		case NFS4ERR_BADOWNER:
246  		case NFS4ERR_OP_ILLEGAL:
247  		case NFS4ERR_LOCKS_HELD:
248  		case NFS4ERR_LOCK_NOTSUPP:
249  		case NFS4ERR_DEADLOCK:
250  		case NFS4ERR_ADMIN_REVOKED:
251  		case NFS4ERR_CB_PATH_DOWN:
252  		default:
253  			return fsalstat(ERR_FSAL_INVAL, (int)nfsstatus);
254  		}
255  	}
256  	
257  	#define PXY_ATTR_BIT(b) (1U << b)
258  	#define PXY_ATTR_BIT2(b) (1U << (b - 32))
259  	
260  	static struct bitmap4 pxy_bitmap_getattr = {
261  		.map[0] =
262  		    (PXY_ATTR_BIT(FATTR4_SUPPORTED_ATTRS) |
263  		     PXY_ATTR_BIT(FATTR4_TYPE) | PXY_ATTR_BIT(FATTR4_CHANGE) |
264  		     PXY_ATTR_BIT(FATTR4_SIZE) | PXY_ATTR_BIT(FATTR4_FSID) |
265  		     PXY_ATTR_BIT(FATTR4_FILEID)),
266  		.map[1] =
267  		    (PXY_ATTR_BIT2(FATTR4_MODE) | PXY_ATTR_BIT2(FATTR4_NUMLINKS) |
268  		     PXY_ATTR_BIT2(FATTR4_OWNER) | PXY_ATTR_BIT2(FATTR4_OWNER_GROUP) |
269  		     PXY_ATTR_BIT2(FATTR4_SPACE_USED) |
270  		     PXY_ATTR_BIT2(FATTR4_TIME_ACCESS) |
271  		     PXY_ATTR_BIT2(FATTR4_TIME_METADATA) |
272  		     PXY_ATTR_BIT2(FATTR4_TIME_MODIFY) | PXY_ATTR_BIT2(FATTR4_RAWDEV)),
273  		.bitmap4_len = 2
274  	};
275  	
276  	static struct bitmap4 pxy_bitmap_fsinfo = {
277  		.map[0] =
278  		    (PXY_ATTR_BIT(FATTR4_FILES_AVAIL) | PXY_ATTR_BIT(FATTR4_FILES_FREE)
279  		     | PXY_ATTR_BIT(FATTR4_FILES_TOTAL)),
280  		.map[1] =
281  		    (PXY_ATTR_BIT2(FATTR4_SPACE_AVAIL) |
282  		     PXY_ATTR_BIT2(FATTR4_SPACE_FREE) |
283  		     PXY_ATTR_BIT2(FATTR4_SPACE_TOTAL)),
284  		.bitmap4_len = 2
285  	};
286  	
287  	static struct bitmap4 lease_bits = {
288  		.map[0] = PXY_ATTR_BIT(FATTR4_LEASE_TIME),
289  		.bitmap4_len = 1
290  	};
291  	
292  	static struct bitmap4 pxy_bitmap_per_file_system_attr = {
293  		.map[0] = PXY_ATTR_BIT(FATTR4_MAXREAD) | PXY_ATTR_BIT(FATTR4_MAXWRITE),
294  		.bitmap4_len = 1
295  	};
296  	
297  	#undef PXY_ATTR_BIT
298  	#undef PXY_ATTR_BIT2
299  	
300  	static struct {
301  		attrmask_t mask;
302  		int fattr_bit;
303  	} fsal_mask2bit[] = {
304  		{
305  		ATTR_SIZE, FATTR4_SIZE}, {
306  		ATTR_MODE, FATTR4_MODE}, {
307  		ATTR_OWNER, FATTR4_OWNER}, {
308  		ATTR_GROUP, FATTR4_OWNER_GROUP}, {
309  		ATTR_ATIME, FATTR4_TIME_ACCESS_SET}, {
310  		ATTR_ATIME_SERVER, FATTR4_TIME_ACCESS_SET}, {
311  		ATTR_MTIME, FATTR4_TIME_MODIFY_SET}, {
312  		ATTR_MTIME_SERVER, FATTR4_TIME_MODIFY_SET}, {
313  		ATTR_CTIME, FATTR4_TIME_METADATA}
314  	};
315  	
316  	static struct bitmap4 empty_bitmap = {
317  		.map[0] = 0,
318  		.map[1] = 0,
319  		.map[2] = 0,
320  		.bitmap4_len = 2
321  	};
322  	
323  	static int pxy_fsalattr_to_fattr4(const struct attrlist *attrs, fattr4 *data)
324  	{
325  		int i;
326  		struct bitmap4 bmap = empty_bitmap;
327  		struct xdr_attrs_args args;
328  	
329  		for (i = 0; i < ARRAY_SIZE(fsal_mask2bit); i++) {
330  			if (FSAL_TEST_MASK(attrs->valid_mask, fsal_mask2bit[i].mask)) {
331  				if (fsal_mask2bit[i].fattr_bit > 31) {
332  					bmap.map[1] |=
333  					    1U << (fsal_mask2bit[i].fattr_bit - 32);
334  					bmap.bitmap4_len = 2;
335  				} else {
336  					bmap.map[0] |=
337  						1U << fsal_mask2bit[i].fattr_bit;
338  				}
339  			}
340  		}
341  	
342  		memset(&args, 0, sizeof(args));
343  		args.attrs = (struct attrlist *)attrs;
344  		args.data = NULL;
345  	
346  		return nfs4_FSALattr_To_Fattr(&args, &bmap, data);
347  	}
348  	
349  	static GETATTR4resok *pxy_fill_getattr_reply(nfs_resop4 *resop, char *blob,
350  						     size_t blob_sz)
351  	{
352  		GETATTR4resok *a = &resop->nfs_resop4_u.opgetattr.GETATTR4res_u.resok4;
353  	
354  		a->obj_attributes.attrmask = empty_bitmap;
355  		a->obj_attributes.attr_vals.attrlist4_val = blob;
356  		a->obj_attributes.attr_vals.attrlist4_len = blob_sz;
357  	
358  		return a;
359  	}
360  	
361  	static int pxy_got_rpc_reply(struct pxy_rpc_io_context *ctx, int sock, int sz,
362  				     u_int xid)
363  	{
364  		char *repbuf = ctx->recvbuf;
365  		int size;
366  	
367  		if (sz > ctx->recvbuf_sz)
368  			return -E2BIG;
369  	
370  		PTHREAD_MUTEX_lock(&ctx->iolock);
371  		memcpy(repbuf, &xid, sizeof(xid));
372  		/*
373  		 * sz includes 4 bytes of xid which have been processed
374  		 * together with record mark - reduce the read to avoid
375  		 * gobbing up next record mark.
376  		 */
377  		repbuf += 4;
378  		ctx->ioresult = 4;
379  		sz -= 4;
380  	
381  		while (sz > 0) {
382  			/* TODO: handle timeouts - use poll(2) */
383  			int bc = read(sock, repbuf, sz);
384  	
385  			if (bc <= 0) {
386  				ctx->ioresult = -((bc < 0) ? errno : ETIMEDOUT);
387  				break;
388  			}
389  			repbuf += bc;
390  			ctx->ioresult += bc;
391  			sz -= bc;
392  		}
393  		ctx->iodone = true;
394  		size = ctx->ioresult;
395  		pthread_cond_signal(&ctx->iowait);
396  		PTHREAD_MUTEX_unlock(&ctx->iolock);
397  		return size;
398  	}
399  	
400  	static int pxy_rpc_read_reply(struct pxy_export *pxy_exp)
401  	{
402  		struct {
403  			uint recmark;
404  			uint xid;
405  		} h;
406  		char *buf = (char *)&h;
407  		struct glist_head *c;
408  		char sink[256];
409  		int cnt = 0;
410  	
411  		while (cnt < 8) {
412  			int bc = read(pxy_exp->rpc.rpc_sock, buf + cnt, 8 - cnt);
413  	
414  			if (bc < 0)
415  				return -errno;
416  			cnt += bc;
417  		}
418  	
419  		h.recmark = ntohl(h.recmark);
420  		/* TODO: check for final fragment */
421  		h.xid = ntohl(h.xid);
422  	
423  		LogDebug(COMPONENT_FSAL, "Recmark %x, xid %u\n", h.recmark, h.xid);
424  		h.recmark &= ~(1U << 31);
425  	
426  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
427  		glist_for_each(c, &pxy_exp->rpc.rpc_calls) {
428  			struct pxy_rpc_io_context *ctx =
429  			    container_of(c, struct pxy_rpc_io_context, calls);
430  	
431  			if (ctx->rpc_xid == h.xid) {
432  				glist_del(c);
433  				PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
434  				return pxy_got_rpc_reply(ctx, pxy_exp->rpc.rpc_sock,
435  							 h.recmark, h.xid);
436  			}
437  		}
438  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
439  	
440  		cnt = h.recmark - 4;
441  		LogDebug(COMPONENT_FSAL, "xid %u is not on the list, skip %d bytes\n",
442  			 h.xid, cnt);
443  		while (cnt > 0) {
444  			int rb = (cnt > sizeof(sink)) ? sizeof(sink) : cnt;
445  	
446  			rb = read(pxy_exp->rpc.rpc_sock, sink, rb);
447  			if (rb <= 0)
448  				return -errno;
449  			cnt -= rb;
450  		}
451  	
452  		return 0;
453  	}
454  	
455  	/* called with listlock */
456  	static void pxy_new_socket_ready(struct pxy_export *pxy_exp)
457  	{
458  		struct glist_head *nxt;
459  		struct glist_head *c;
460  	
461  		/* If there are any outstanding calls then tell them to resend */
462  		glist_for_each_safe(c, nxt, &pxy_exp->rpc.rpc_calls) {
463  			struct pxy_rpc_io_context *ctx =
464  			    container_of(c, struct pxy_rpc_io_context, calls);
465  	
466  			glist_del(c);
467  	
468  			PTHREAD_MUTEX_lock(&ctx->iolock);
469  			ctx->iodone = true;
470  			ctx->ioresult = -EAGAIN;
471  			pthread_cond_signal(&ctx->iowait);
472  			PTHREAD_MUTEX_unlock(&ctx->iolock);
473  		}
474  	
475  		/* If there is anyone waiting for the socket then tell them
476  		 * it's ready */
477  		pthread_cond_broadcast(&pxy_exp->rpc.sockless);
478  	}
479  	
480  	/* called with listlock */
481  	static int pxy_connect(struct pxy_export *pxy_exp,
482  			       sockaddr_t *dest, uint16_t port)
483  	{
484  		int sock;
485  		int socklen;
486  	
487  		if (pxy_exp->info.use_privileged_client_port) {
488  			int priv_port = 0;
489  	
490  			sock = rresvport_af(&priv_port, dest->ss_family);
491  			if (sock < 0)
492  				LogCrit(COMPONENT_FSAL,
493  					"Cannot create TCP socket on privileged port");
494  		} else {
495  			sock = socket(dest->ss_family, SOCK_STREAM, IPPROTO_TCP);
496  			if (sock < 0)
497  				LogCrit(COMPONENT_FSAL, "Cannot create TCP socket - %d",
498  					errno);
499  		}
500  	
501  		switch (dest->ss_family) {
502  		case AF_INET:
503  			((struct sockaddr_in *)dest)->sin_port = htons(port);
504  			socklen = sizeof(struct sockaddr_in);
505  			break;
506  		case AF_INET6:
507  			((struct sockaddr_in6 *)dest)->sin6_port = htons(port);
508  			socklen = sizeof(struct sockaddr_in6);
509  			break;
510  		default:
511  			LogCrit(COMPONENT_FSAL, "Unknown address family %d",
512  				dest->ss_family);
513  			close(sock);
514  			return -1;
515  		}
516  	
517  		if (sock >= 0) {
518  			if (connect(sock, (struct sockaddr *)dest, socklen) < 0) {
519  				close(sock);
520  				sock = -1;
521  			} else {
522  				pxy_new_socket_ready(pxy_exp);
523  			}
524  		}
525  		return sock;
526  	}
527  	
528  	/*
529  	 * NB! rpc_sock can be closed by the sending thread but it will not be
530  	 *     changing its value. Only this function will change rpc_sock which
531  	 *     means that it can look at the value without holding the lock.
532  	 */
533  	static void *pxy_rpc_recv(void *arg)
534  	{
535  		struct pxy_export *pxy_exp = arg;
536  		char addr[INET6_ADDRSTRLEN];
537  		struct pollfd pfd;
538  		int millisec = pxy_exp->info.srv_timeout * 1000;
539  	
540  		SetNameFunction("pxy_rcv_thread");
541  	
542  		rcu_register_thread();
543  		while (!pxy_exp->rpc.close_thread) {
544  			int nsleeps = 0;
545  	
546  			PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
547  			do {
548  				pxy_exp->rpc.rpc_sock = pxy_connect(pxy_exp,
549  							&pxy_exp->info.srv_addr,
550  							pxy_exp->info.srv_port);
551  				/* early stop test */
552  				if (pxy_exp->rpc.close_thread) {
553  					PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
554  					goto out;
555  				}
556  				if (pxy_exp->rpc.rpc_sock < 0) {
557  					if (nsleeps == 0)
558  						sprint_sockaddr(&pxy_exp->info.srv_addr,
559  								addr, sizeof(addr));
560  						LogCrit(COMPONENT_FSAL,
561  							"Cannot connect to server %s:%u",
562  							addr, pxy_exp->info.srv_port);
563  					PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
564  					sleep(pxy_exp->info.retry_sleeptime);
565  					nsleeps++;
566  					PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
567  				} else {
568  					LogDebug(COMPONENT_FSAL,
569  						 "Connected after %d sleeps, resending outstanding calls",
570  						 nsleeps);
571  				}
572  			} while (pxy_exp->rpc.rpc_sock < 0 &&
573  							!pxy_exp->rpc.close_thread);
574  			PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
575  			/* early stop test */
576  			if (pxy_exp->rpc.close_thread)
577  				goto out;
578  	
579  			pfd.fd = pxy_exp->rpc.rpc_sock;
580  			pfd.events = POLLIN | POLLRDHUP;
581  	
582  			while (pxy_exp->rpc.rpc_sock >= 0) {
583  				switch (poll(&pfd, 1, millisec)) {
584  				case 0:
585  					LogDebug(COMPONENT_FSAL,
586  						 "Timeout, wait again...");
587  					continue;
588  	
589  				case -1:
590  					break;
591  	
592  				default:
593  					if (pfd.revents & POLLRDHUP) {
594  						LogEvent(COMPONENT_FSAL,
595  							 "Other end has closed connection, reconnecting...");
596  					} else if (pfd.revents & POLLNVAL) {
597  						LogEvent(COMPONENT_FSAL,
598  							 "Socket is closed");
599  					} else {
600  						if (pxy_rpc_read_reply(pxy_exp) >= 0)
601  							continue;
602  					}
603  					break;
604  				}
605  	
606  				PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
607  				close(pxy_exp->rpc.rpc_sock);
608  				pxy_exp->rpc.rpc_sock = -1;
609  				PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
610  			}
611  		}
612  	out:
613  		rcu_unregister_thread();
614  		return NULL;
615  	}
616  	
617  	static enum clnt_stat pxy_process_reply(struct pxy_rpc_io_context *ctx,
618  						COMPOUND4res *res)
619  	{
620  		enum clnt_stat rc = RPC_CANTRECV;
621  		struct timespec ts;
622  	
623  		PTHREAD_MUTEX_lock(&ctx->iolock);
624  		ts.tv_sec = time(NULL) + 60;
625  		ts.tv_nsec = 0;
626  	
627  		while (!ctx->iodone) {
628  			int w = pthread_cond_timedwait(&ctx->iowait, &ctx->iolock, &ts);
629  	
630  			if (w == ETIMEDOUT) {
631  				PTHREAD_MUTEX_unlock(&ctx->iolock);
632  				return RPC_TIMEDOUT;
633  			}
634  		}
635  	
636  		ctx->iodone = false;
637  		PTHREAD_MUTEX_unlock(&ctx->iolock);
638  	
639  		if (ctx->ioresult > 0) {
640  			struct rpc_msg reply;
641  			XDR x;
642  	
643  			memset(&reply, 0, sizeof(reply));
644  			reply.RPCM_ack.ar_results.proc =
645  			    (xdrproc_t) xdr_COMPOUND4res;
646  			reply.RPCM_ack.ar_results.where = res;
647  	
648  			memset(&x, 0, sizeof(x));
649  			xdrmem_create(&x, ctx->recvbuf, ctx->ioresult, XDR_DECODE);
650  	
651  			/* macro is defined, GCC 4.7.2 ignoring */
652  			if (xdr_replymsg(&x, &reply)) {
653  				if (reply.rm_reply.rp_stat == MSG_ACCEPTED) {
654  					switch (reply.rm_reply.rp_acpt.ar_stat) {
655  					case SUCCESS:
656  						rc = RPC_SUCCESS;
657  						break;
658  					case PROG_UNAVAIL:
659  						rc = RPC_PROGUNAVAIL;
660  						break;
661  					case PROG_MISMATCH:
662  						rc = RPC_PROGVERSMISMATCH;
663  						break;
664  					case PROC_UNAVAIL:
665  						rc = RPC_PROCUNAVAIL;
666  						break;
667  					case GARBAGE_ARGS:
668  						rc = RPC_CANTDECODEARGS;
669  						break;
670  					case SYSTEM_ERR:
671  						rc = RPC_SYSTEMERROR;
672  						break;
673  					default:
674  						rc = RPC_FAILED;
675  						break;
676  					}
677  				} else {
678  					switch (reply.rm_reply.rp_rjct.rj_stat) {
679  					case RPC_MISMATCH:
680  						rc = RPC_VERSMISMATCH;
681  						break;
682  					case AUTH_ERROR:
683  						rc = RPC_AUTHERROR;
684  						break;
685  					default:
686  						rc = RPC_FAILED;
687  						break;
688  					}
689  				}
690  			} else {
691  				rc = RPC_CANTDECODERES;
692  			}
693  	
694  			reply.RPCM_ack.ar_results.proc = (xdrproc_t) xdr_void;
695  			reply.RPCM_ack.ar_results.where = NULL;
696  	
697  			xdr_free((xdrproc_t) xdr_replymsg, &reply);
698  		}
699  		return rc;
700  	}
701  	
702  	static inline int pxy_rpc_need_sock(struct pxy_export *pxy_exp)
703  	{
704  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
705  		while (pxy_exp->rpc.rpc_sock < 0 && !pxy_exp->rpc.close_thread)
706  			pthread_cond_wait(&pxy_exp->rpc.sockless,
707  					  &pxy_exp->rpc.listlock);
708  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
709  		return pxy_exp->rpc.close_thread;
710  	}
711  	
712  	static inline int pxy_rpc_renewer_wait(int timeout, struct pxy_export *pxy_exp)
713  	{
714  		struct timespec ts;
715  		int rc;
716  	
717  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
718  		ts.tv_sec = time(NULL) + timeout;
719  		ts.tv_nsec = 0;
720  	
721  		rc = pthread_cond_timedwait(&pxy_exp->rpc.sockless,
722  					    &pxy_exp->rpc.listlock, &ts);
723  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
724  		return (rc == ETIMEDOUT);
725  	}
726  	
727  	static int pxy_compoundv4_call(struct pxy_rpc_io_context *pcontext,
728  				       const struct user_cred *cred,
729  				       COMPOUND4args *args, COMPOUND4res *res,
730  				       struct pxy_export *pxy_exp)
731  	{
732  		XDR x;
733  		struct rpc_msg rmsg;
734  		AUTH *au;
735  		enum clnt_stat rc;
736  	
737  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
738  		rmsg.rm_xid = pxy_exp->rpc.rpc_xid++;
739  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
740  		rmsg.rm_direction = CALL;
741  	
742  		rmsg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
743  		rmsg.cb_prog = pcontext->nfs_prog;
744  		rmsg.cb_vers = FSAL_PROXY_NFS_V4;
745  		rmsg.cb_proc = NFSPROC4_COMPOUND;
746  	
747  		if (cred) {
748  			au = authunix_ncreate(pxy_exp->rpc.pxy_hostname,
749  					      cred->caller_uid, cred->caller_gid,
750  					      cred->caller_glen, cred->caller_garray);
751  		} else {
752  			au = authunix_ncreate_default();
753  		}
754  		if (AUTH_FAILURE(au)) {
755  			char *err = rpc_sperror(&au->ah_error, "failed");
756  	
757  			LogDebug(COMPONENT_FSAL, "%s", err);
758  			gsh_free(err);
759  			AUTH_DESTROY(au);
760  			return RPC_AUTHERROR;
761  		}
762  	
763  		rmsg.cb_cred = au->ah_cred;
764  		rmsg.cb_verf = au->ah_verf;
765  	
766  		memset(&x, 0, sizeof(x));
767  		xdrmem_create(&x, pcontext->sendbuf + 4, pcontext->sendbuf_sz,
768  			      XDR_ENCODE);
769  		if (xdr_callmsg(&x, &rmsg) && xdr_COMPOUND4args(&x, args)) {
770  			u_int pos = xdr_getpos(&x);
771  			u_int recmark = ntohl(pos | (1U << 31));
772  			int first_try = 1;
773  	
774  			pcontext->rpc_xid = rmsg.rm_xid;
775  	
776  			memcpy(pcontext->sendbuf, &recmark, sizeof(recmark));
777  			pos += 4;
778  	
779  			do {
780  				int bc = 0;
781  				char *buf = pcontext->sendbuf;
782  	
783  				LogDebug(COMPONENT_FSAL, "%ssend XID %u with %d bytes",
784  					 (first_try ? "First attempt to " : "Re"),
785  					 rmsg.rm_xid, pos);
786  				PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
787  				while (bc < pos) {
788  					int wc = write(pxy_exp->rpc.rpc_sock, buf,
789  						       pos - bc);
790  	
791  					if (wc <= 0) {
792  						close(pxy_exp->rpc.rpc_sock);
793  						break;
794  					}
795  					bc += wc;
796  					buf += wc;
797  				}
798  	
799  				if (bc == pos) {
800  					if (first_try) {
801  						glist_add_tail(&pxy_exp->rpc.rpc_calls,
802  							       &pcontext->calls);
803  						first_try = 0;
804  					}
805  				} else {
806  					if (!first_try)
807  						glist_del(&pcontext->calls);
808  				}
809  				PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
810  	
811  				if (bc == pos)
812  					rc = pxy_process_reply(pcontext, res);
813  				else
814  					rc = RPC_CANTSEND;
815  			} while (rc == RPC_TIMEDOUT);
816  		} else {
817  			rc = RPC_CANTENCODEARGS;
818  		}
819  	
820  		auth_destroy(au);
821  		return rc;
822  	}
823  	
824  	int pxy_compoundv4_execute(const char *caller, const struct user_cred *creds,
825  				   uint32_t cnt, nfs_argop4 *argoparray,
826  				   nfs_resop4 *resoparray, struct pxy_export *pxy_exp)
827  	{
828  		enum clnt_stat rc;
829  		struct pxy_rpc_io_context *ctx;
830  		COMPOUND4args arg = {
831  			.minorversion = FSAL_PROXY_NFS_V4_MINOR,
832  			.argarray.argarray_val = argoparray,
833  			.argarray.argarray_len = cnt
834  		};
835  		COMPOUND4res res = {
836  			.resarray.resarray_val = resoparray,
837  			.resarray.resarray_len = cnt
838  		};
839  	
840  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.context_lock);
841  		while (glist_empty(&pxy_exp->rpc.free_contexts))
842  			pthread_cond_wait(&pxy_exp->rpc.need_context,
843  					  &pxy_exp->rpc.context_lock);
844  		ctx =
845  		    glist_first_entry(&pxy_exp->rpc.free_contexts,
846  				      struct pxy_rpc_io_context, calls);
847  		glist_del(&ctx->calls);
848  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.context_lock);
849  	
850  		/* fill slotid and sequenceid */
851  		if (argoparray->argop == NFS4_OP_SEQUENCE) {
852  			SEQUENCE4args *opsequence =
853  						&argoparray->nfs_argop4_u.opsequence;
854  	
855  			/* set slotid */
856  			opsequence->sa_slotid = ctx->slotid;
857  			/* increment and set sequence id */
858  			opsequence->sa_sequenceid = ++ctx->seqid;
859  		}
860  	
861  		do {
862  			rc = pxy_compoundv4_call(ctx, creds, &arg, &res, pxy_exp);
863  			if (rc != RPC_SUCCESS)
864  				LogDebug(COMPONENT_FSAL, "%s failed with %d", caller,
865  					 rc);
866  			if (rc == RPC_CANTSEND)
867  				if (pxy_rpc_need_sock(pxy_exp))
868  					return -1;
869  		} while ((rc == RPC_CANTRECV && (ctx->ioresult == -EAGAIN))
870  			 || (rc == RPC_CANTSEND));
871  	
872  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.context_lock);
873  		pthread_cond_signal(&pxy_exp->rpc.need_context);
874  		glist_add(&pxy_exp->rpc.free_contexts, &ctx->calls);
875  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.context_lock);
876  	
877  		if (rc == RPC_SUCCESS)
878  			return res.status;
879  		return rc;
880  	}
881  	
882  	static inline int pxy_nfsv4_call(const struct user_cred *creds, uint32_t cnt,
883  					 nfs_argop4 *args, nfs_resop4 *resp)
884  	{
885  		struct pxy_export *pxy_exp = container_of(op_ctx->fsal_export,
886  							  struct pxy_export, exp);
887  	
888  		return pxy_compoundv4_execute(__func__, creds, cnt, args, resp,
889  					      pxy_exp);
890  	}
891  	
892  	static inline void pxy_get_clientid(struct pxy_export *pxy_exp, clientid4 *ret)
893  	{
894  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.pxy_clientid_mutex);
895  		*ret = pxy_exp->rpc.pxy_clientid;
896  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.pxy_clientid_mutex);
897  	}
898  	
899  	static inline void pxy_get_client_sessionid(sessionid4 ret)
900  	{
901  		struct pxy_export *pxy_exp = container_of(op_ctx->fsal_export,
902  							  struct pxy_export, exp);
903  	
904  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.pxy_clientid_mutex);
905  		while (pxy_exp->rpc.no_sessionid)
906  			pthread_cond_wait(&pxy_exp->rpc.cond_sessionid,
907  					  &pxy_exp->rpc.pxy_clientid_mutex);
908  		memcpy(ret, pxy_exp->rpc.pxy_client_sessionid, sizeof(sessionid4));
909  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.pxy_clientid_mutex);
910  	}
911  	
912  	static inline void pxy_get_client_sessionid_export(sessionid4 ret,
913  							   struct pxy_export *pxy_exp)
914  	{
915  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.pxy_clientid_mutex);
916  		while (pxy_exp->rpc.no_sessionid)
917  			pthread_cond_wait(&pxy_exp->rpc.cond_sessionid,
918  					  &pxy_exp->rpc.pxy_clientid_mutex);
919  		memcpy(ret, pxy_exp->rpc.pxy_client_sessionid, sizeof(sessionid4));
920  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.pxy_clientid_mutex);
921  	}
922  	
923  	static inline void pxy_get_client_seqid(struct pxy_export *pxy_exp,
924  						sequenceid4 *ret)
925  	{
926  		PTHREAD_MUTEX_lock(&pxy_exp->rpc.pxy_clientid_mutex);
927  		*ret = pxy_exp->rpc.pxy_client_seqid;
928  		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.pxy_clientid_mutex);
929  	}
930  	
931  	/**
932  	 * Confirm pxy_clientid to set a new session.
933  	 *
934  	 * @param[out] new_sessionid The new session id
935  	 * @param[out] new_lease_time Lease time from the background NFSv4.1 server
936  	 *
937  	 * @return 0 on success or NFS error code
938  	 */
939  	static int pxy_setsessionid(sessionid4 new_sessionid, uint32_t *lease_time,
940  				    struct pxy_export *pxy_exp)
941  	{
942  		int rc;
943  		int opcnt = 0;
944  		/* CREATE_SESSION to set session id */
945  		/* SEQUENCE RECLAIM_COMPLETE PUTROOTFH GETATTR to get lease time */
946  	#define FSAL_SESSIONID_NB_OP_ALLOC 4
947  		nfs_argop4 arg[FSAL_SESSIONID_NB_OP_ALLOC];
948  		nfs_resop4 res[FSAL_SESSIONID_NB_OP_ALLOC];
949  		clientid4 cid;
950  		sequenceid4 seqid;
951  		CREATE_SESSION4res *s_res;
952  		CREATE_SESSION4resok *res_ok;
953  		callback_sec_parms4 sec_parms4;
954  		uint32_t fore_ca_rdma_ird_val_sink;
955  		uint32_t back_ca_rdma_ird_val_sink;
956  	
957  		pxy_get_clientid(pxy_exp, &cid);
958  		pxy_get_client_seqid(pxy_exp, &seqid);
959  		LogDebug(COMPONENT_FSAL, "Getting new session id for client id %"PRIx64
960  					" with sequence id %"PRIx32, cid, seqid);
961  		s_res = &res->nfs_resop4_u.opcreate_session;
962  		res_ok = &s_res->CREATE_SESSION4res_u.csr_resok4;
963  		res_ok->csr_fore_chan_attrs.ca_rdma_ird.ca_rdma_ird_len = 0;
964  		res_ok->csr_fore_chan_attrs.ca_rdma_ird.ca_rdma_ird_val =
965  							&fore_ca_rdma_ird_val_sink;
966  		res_ok->csr_back_chan_attrs.ca_rdma_ird.ca_rdma_ird_len = 0;
967  		res_ok->csr_back_chan_attrs.ca_rdma_ird.ca_rdma_ird_val =
968  							&back_ca_rdma_ird_val_sink;
969  	
970  		COMPOUNDV4_ARG_ADD_OP_CREATE_SESSION(opcnt, arg, cid, seqid,
971  						     (&(pxy_exp->info)), &sec_parms4);
972  		rc = pxy_compoundv4_execute(__func__, NULL, opcnt, arg, res, pxy_exp);
973  		if (rc != NFS4_OK)
974  			return -1;
975  	
976  		/*get session_id in res*/
977  		if (s_res->csr_status != NFS4_OK)
978  			return -1;
979  	
980  		memcpy(new_sessionid,
981  		       res_ok->csr_sessionid,
982  		       sizeof(sessionid4));
983  	
984  		/* Get the lease time */
985  		opcnt = 0;
986  		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, arg, new_sessionid, NB_RPC_SLOT);
987  		COMPOUNDV4_ARG_ADD_OP_GLOBAL_RECLAIM_COMPLETE(opcnt, arg);
988  		COMPOUNDV4_ARG_ADD_OP_PUTROOTFH(opcnt, arg);
989  		pxy_fill_getattr_reply(res + opcnt, (char *)lease_time,
990  				       sizeof(*lease_time));
991  		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, arg, lease_bits);
992  	
993  		rc = pxy_compoundv4_execute(__func__, NULL, opcnt, arg, res, pxy_exp);
994  		if (rc != NFS4_OK) {
995  			*lease_time = 60;
996  			LogDebug(COMPONENT_FSAL,
997  				 "Setting new lease_time to default %d", *lease_time);
998  		} else {
999  			*lease_time = ntohl(*lease_time);
1000 			LogDebug(COMPONENT_FSAL,
1001 				 "Getting new lease %d", *lease_time);
1002 		}
1003 	
1004 		return 0;
1005 	}
1006 	
1007 	static int pxy_setclientid(clientid4 *new_clientid, sequenceid4 *new_seqid,
1008 				   struct pxy_export *pxy_exp)
1009 	{
1010 		int rc;
1011 	#define FSAL_EXCHANGE_ID_NB_OP_ALLOC 1
1012 		nfs_argop4 arg[FSAL_EXCHANGE_ID_NB_OP_ALLOC];
1013 		nfs_resop4 res[FSAL_EXCHANGE_ID_NB_OP_ALLOC];
1014 		client_owner4 clientid;
1015 		char clientid_name[MAXNAMLEN + 1];
1016 		uint64_t temp_verifier;
1017 		EXCHANGE_ID4args opexchange_id;
1018 		EXCHANGE_ID4res *ei_res;
1019 		EXCHANGE_ID4resok *ei_resok;
1020 		char so_major_id_val[NFS4_OPAQUE_LIMIT];
1021 		char eir_server_scope_val[NFS4_OPAQUE_LIMIT];
1022 		nfs_impl_id4 eir_server_impl_id_val;
1023 		struct sockaddr_in sin;
1024 		socklen_t slen = sizeof(sin);
1025 		char addrbuf[sizeof("255.255.255.255")];
1026 	
1027 		LogEvent(COMPONENT_FSAL,
1028 			 "Negotiating a new ClientId with the remote server");
1029 	
1030 		/* prepare input */
1031 		if (getsockname(pxy_exp->rpc.rpc_sock, &sin, &slen))
1032 			return -errno;
1033 	
1034 		snprintf(clientid_name, MAXNAMLEN, "%s(%d) - GANESHA NFSv4 Proxy",
1035 			 inet_ntop(AF_INET, &sin.sin_addr, addrbuf, sizeof(addrbuf)),
1036 			 getpid());
1037 		clientid.co_ownerid.co_ownerid_len = strlen(clientid_name);
1038 		clientid.co_ownerid.co_ownerid_val = clientid_name;
1039 	
1040 		/* copy to intermediate uint64_t to 0-fill or truncate as needed */
1041 		temp_verifier = (uint64_t)nfs_ServerBootTime.tv_sec;
1042 		BUILD_BUG_ON(sizeof(clientid.co_verifier) != sizeof(uint64_t));
1043 		memcpy(&clientid.co_verifier, &temp_verifier, sizeof(uint64_t));
1044 	
1045 	
1046 		arg[0].argop = NFS4_OP_EXCHANGE_ID;
1047 		opexchange_id.eia_clientowner = clientid;
1048 		opexchange_id.eia_flags = 0;
1049 		opexchange_id.eia_state_protect.spa_how = SP4_NONE;
1050 		opexchange_id.eia_client_impl_id.eia_client_impl_id_len = 0;
1051 		opexchange_id.eia_client_impl_id.eia_client_impl_id_val = NULL;
1052 		arg[0].nfs_argop4_u.opexchange_id = opexchange_id;
1053 	
1054 		/* prepare reply */
1055 		ei_res = &res->nfs_resop4_u.opexchange_id;
1056 		ei_resok = &ei_res->EXCHANGE_ID4res_u.eir_resok4;
1057 		ei_resok->eir_server_owner.so_major_id.so_major_id_val =
1058 								so_major_id_val;
1059 		ei_resok->eir_server_scope.eir_server_scope_val = eir_server_scope_val;
1060 		ei_resok->eir_server_impl_id.eir_server_impl_id_val =
1061 							&eir_server_impl_id_val;
1062 	
1063 		rc = pxy_compoundv4_execute(__func__, NULL, 1, arg, res, pxy_exp);
1064 		if (rc != NFS4_OK) {
1065 			LogDebug(COMPONENT_FSAL,
1066 				 "Compound setclientid res request returned %d",
1067 				 rc);
1068 			return -1;
1069 		}
1070 	
1071 		/* Keep the confirmed client id and sequence id*/
1072 		if (ei_res->eir_status != NFS4_OK) {
1073 			LogDebug(COMPONENT_FSAL, "EXCHANGE_ID res status is %d",
1074 				 ei_res->eir_status);
1075 			return -1;
1076 		}
1077 		*new_clientid = ei_resok->eir_clientid;
1078 		*new_seqid = ei_resok->eir_sequenceid;
1079 	
1080 		return 0;
1081 	}
1082 	
1083 	static void *pxy_clientid_renewer(void *arg)
1084 	{
1085 		struct pxy_export *pxy_exp = arg;
1086 		int clientid_needed = 1;
1087 		int sessionid_needed = 1;
1088 		uint32_t lease_time = 60;
1089 	
1090 		SetNameFunction("pxy_clientid_renewer");
1091 	
1092 		rcu_register_thread();
(1) Event cond_true: Condition "!pxy_exp->rpc.close_thread", taking true branch.
(24) Event loop_begin: Jumped back to beginning of loop.
(25) Event cond_true: Condition "!pxy_exp->rpc.close_thread", taking true branch.
(48) Event cond_true: Condition "!pxy_exp->rpc.close_thread", taking true branch.
1093 		while (!pxy_exp->rpc.close_thread) {
1094 			clientid4 newcid = 0;
1095 			sequenceid4 newseqid = 0;
1096 	
(2) Event cond_false: Condition "!sessionid_needed", taking false branch.
(26) Event cond_false: Condition "!sessionid_needed", taking false branch.
(49) Event cond_true: Condition "!sessionid_needed", taking true branch.
(50) Event cond_true: Condition "pxy_rpc_renewer_wait(lease_time - 5, pxy_exp)", taking true branch.
1097 			if (!sessionid_needed &&
1098 					pxy_rpc_renewer_wait(lease_time - 5, pxy_exp)) {
1099 				/* Simply renew the session id you've got */
1100 				nfs_argop4 seq_arg;
1101 				nfs_resop4 res;
1102 				int opcnt = 0;
1103 				int rc;
1104 				sessionid4 sid;
1105 				clientid4 cid;
1106 				SEQUENCE4res *s_res;
1107 				SEQUENCE4resok *s_resok;
1108 	
1109 				pxy_get_clientid(pxy_exp, &cid);
1110 				pxy_get_client_sessionid_export(sid, pxy_exp);
(51) Event cond_true: Condition "!!(component_log_level[COMPONENT_FSAL] >= NIV_DEBUG)", taking true branch.
(52) Event cond_true: Condition "!!(component_log_level[COMPONENT_FSAL] >= NIV_DEBUG)", taking true branch.
1111 				LogDebug(COMPONENT_FSAL,
1112 					 "Try renew session id for client id %"PRIx64,
1113 					 cid);
(53) Event address_of: Taking address with "&seq_arg" yields a singleton pointer.
(54) Event ptr_arith: Using "&seq_arg" as an array. This might corrupt or misinterpret adjacent memory locations.
1114 				COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, &seq_arg, sid,
1115 							       NB_RPC_SLOT);
1116 				s_res = &res.nfs_resop4_u.opsequence;
1117 				s_resok = &s_res->SEQUENCE4res_u.sr_resok4;
1118 				s_resok->sr_status_flags = 0;
1119 				rc = pxy_compoundv4_execute(__func__, NULL, 1, &seq_arg,
1120 							    &res, pxy_exp);
1121 				if (rc == NFS4_OK && !s_resok->sr_status_flags) {
1122 					LogDebug(COMPONENT_FSAL,
1123 						 "New session id for client id %"PRIu64,
1124 						 cid);
1125 					continue;
1126 				} else if (rc == NFS4_OK &&
1127 					   s_resok->sr_status_flags) {
1128 					LogEvent(COMPONENT_FSAL,
1129 		"sr_status_flags received on renewing session with seqop : %"PRIu32,
1130 						 s_resok->sr_status_flags);
1131 					continue;
1132 				} else if (rc != NFS4_OK) {
1133 					LogEvent(COMPONENT_FSAL,
1134 						 "Failed to renew session");
1135 				}
(3) Event if_end: End of if statement.
(27) Event if_end: End of if statement.
1136 			}
1137 	
1138 			/* early stop test */
(4) Event cond_false: Condition "pxy_exp->rpc.close_thread", taking false branch.
(28) Event cond_false: Condition "pxy_exp->rpc.close_thread", taking false branch.
1139 			if (pxy_exp->rpc.close_thread)
(5) Event if_end: End of if statement.
(29) Event if_end: End of if statement.
1140 				break;
1141 	
1142 			/* We've either failed to renew or rpc socket has been
1143 			 * reconnected and we need new clientid or sessionid. */
(6) Event cond_false: Condition "pxy_rpc_need_sock(pxy_exp)", taking false branch.
(30) Event cond_false: Condition "pxy_rpc_need_sock(pxy_exp)", taking false branch.
1144 			if (pxy_rpc_need_sock(pxy_exp))
1145 				/* early stop test */
(7) Event if_end: End of if statement.
(31) Event if_end: End of if statement.
1146 				break;
1147 	
1148 			/* We need a new session_id */
(8) Event cond_false: Condition "!clientid_needed", taking false branch.
(32) Event cond_true: Condition "!clientid_needed", taking true branch.
1149 			if (!clientid_needed) {
1150 				sessionid4 new_sessionid;
1151 	
(33) Event cond_true: Condition "!!(component_log_level[COMPONENT_FSAL] >= NIV_DEBUG)", taking true branch.
(34) Event cond_true: Condition "!!(component_log_level[COMPONENT_FSAL] >= NIV_DEBUG)", taking true branch.
1152 				LogDebug(COMPONENT_FSAL, "Need %d new session id",
1153 					 sessionid_needed);
1154 				sessionid_needed = pxy_setsessionid(new_sessionid,
1155 								&lease_time, pxy_exp);
(35) Event cond_true: Condition "!sessionid_needed", taking true branch.
1156 				if (!sessionid_needed) {
(36) Event cond_true: Condition "rc == 0", taking true branch.
(37) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(38) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(39) Event if_fallthrough: Falling through to end of if statement.
(40) Event if_end: End of if statement.
1157 					PTHREAD_MUTEX_lock(
1158 						&pxy_exp->rpc.pxy_clientid_mutex);
1159 					/* Set new session id */
1160 					memcpy(pxy_exp->rpc.pxy_client_sessionid,
1161 					       new_sessionid, sizeof(sessionid4));
1162 					pxy_exp->rpc.no_sessionid = false;
1163 					pthread_cond_broadcast(
1164 							&pxy_exp->rpc.cond_sessionid);
1165 					/*
1166 					 * We finish our create session request next
1167 					 * one will use the next client sequence id.
1168 					 */
1169 					pxy_exp->rpc.pxy_client_seqid++;
(41) Event cond_true: Condition "rc == 0", taking true branch.
(42) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(43) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(44) Event if_fallthrough: Falling through to end of if statement.
(45) Event if_end: End of if statement.
1170 					PTHREAD_MUTEX_unlock(
1171 						&pxy_exp->rpc.pxy_clientid_mutex);
(46) Event continue: Continuing loop.
1172 					continue;
1173 				}
(9) Event if_end: End of if statement.
1174 			}
1175 	
(10) Event cond_true: Condition "!!(component_log_level[COMPONENT_FSAL] >= NIV_DEBUG)", taking true branch.
(11) Event cond_true: Condition "!!(component_log_level[COMPONENT_FSAL] >= NIV_DEBUG)", taking true branch.
1176 			LogDebug(COMPONENT_FSAL, "Need %d new client id",
1177 				 clientid_needed);
1178 			clientid_needed = pxy_setclientid(&newcid, &newseqid, pxy_exp);
(12) Event cond_true: Condition "!clientid_needed", taking true branch.
1179 			if (!clientid_needed) {
(13) Event cond_true: Condition "rc == 0", taking true branch.
(14) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(15) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(16) Event if_fallthrough: Falling through to end of if statement.
(17) Event if_end: End of if statement.
1180 				PTHREAD_MUTEX_lock(&pxy_exp->rpc.pxy_clientid_mutex);
1181 				pxy_exp->rpc.pxy_clientid = newcid;
1182 				pxy_exp->rpc.pxy_client_seqid = newseqid;
(18) Event cond_true: Condition "rc == 0", taking true branch.
(19) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(20) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(21) Event if_fallthrough: Falling through to end of if statement.
(22) Event if_end: End of if statement.
1183 				PTHREAD_MUTEX_unlock(&pxy_exp->rpc.pxy_clientid_mutex);
1184 			}
(23) Event loop: Jumping back to the beginning of the loop.
(47) Event loop: Looping back.
1185 		}
1186 		rcu_unregister_thread();
1187 		return NULL;
1188 	}
1189 	
1190 	static void free_io_contexts(struct pxy_export *pxy_exp)
1191 	{
1192 		struct glist_head *cur, *n;
1193 	
1194 		glist_for_each_safe(cur, n, &pxy_exp->rpc.free_contexts) {
1195 			struct pxy_rpc_io_context *c =
1196 			    container_of(cur, struct pxy_rpc_io_context, calls);
1197 	
1198 			glist_del(cur);
1199 			gsh_free(c);
1200 		}
1201 	}
1202 	
1203 	int pxy_close_thread(struct pxy_export *pxy_exp)
1204 	{
1205 		int rc;
1206 	
1207 		/* setting boolean to stop thread */
1208 		pxy_exp->rpc.close_thread = true;
1209 	
1210 		/* waiting threads ends */
1211 		/* pxy_clientid_renewer is usually waiting on sockless cond : wake up */
1212 		/* pxy_rpc_recv is usually polling rpc_sock : wake up by closing it */
1213 		PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
1214 		pthread_cond_broadcast(&pxy_exp->rpc.sockless);
1215 		close(pxy_exp->rpc.rpc_sock);
1216 		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
1217 		rc = pthread_join(pxy_exp->rpc.pxy_renewer_thread, NULL);
1218 		if (rc) {
1219 			LogWarn(COMPONENT_FSAL,
1220 				"Error on waiting the pxy_renewer_thread end : %d", rc);
1221 			return rc;
1222 		}
1223 	
1224 		rc = pthread_join(pxy_exp->rpc.pxy_recv_thread, NULL);
1225 		if (rc) {
1226 			LogWarn(COMPONENT_FSAL,
1227 				"Error on waiting the pxy_recv_thread end : %d", rc);
1228 			return rc;
1229 		}
1230 	
1231 		return 0;
1232 	}
1233 	
1234 	int pxy_init_rpc(struct pxy_export *pxy_exp)
1235 	{
1236 		int rc;
1237 		int i = NB_RPC_SLOT-1;
1238 	
1239 		PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
1240 		glist_init(&pxy_exp->rpc.rpc_calls);
1241 		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
1242 	
1243 		PTHREAD_MUTEX_lock(&pxy_exp->rpc.context_lock);
1244 		glist_init(&pxy_exp->rpc.free_contexts);
1245 		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.context_lock);
1246 	
1247 	/**
1248 	 * @todo this lock is not really necessary so long as we can
1249 	 *       only do one export at a time.  This is a reminder that
1250 	 *       there is work to do to get this fnctn to truely be
1251 	 *       per export.
1252 	 */
1253 		PTHREAD_MUTEX_lock(&pxy_exp->rpc.listlock);
1254 		if (pxy_exp->rpc.rpc_xid == 0)
1255 			pxy_exp->rpc.rpc_xid = getpid() ^ time(NULL);
1256 		PTHREAD_MUTEX_unlock(&pxy_exp->rpc.listlock);
1257 		if (gethostname(pxy_exp->rpc.pxy_hostname,
1258 				sizeof(pxy_exp->rpc.pxy_hostname)))
1259 			strlcpy(pxy_exp->rpc.pxy_hostname, "NFS-GANESHA/Proxy",
1260 				sizeof(pxy_exp->rpc.pxy_hostname));
1261 	
1262 		for (i = NB_RPC_SLOT-1; i >= 0; i--) {
1263 			struct pxy_rpc_io_context *c =
1264 			    gsh_malloc(sizeof(*c) + pxy_exp->info.srv_sendsize +
1265 				       pxy_exp->info.srv_recvsize);
1266 			PTHREAD_MUTEX_init(&c->iolock, NULL);
1267 			PTHREAD_COND_init(&c->iowait, NULL);
1268 			c->nfs_prog = pxy_exp->info.srv_prognum;
1269 			c->sendbuf_sz = pxy_exp->info.srv_sendsize;
1270 			c->recvbuf_sz = pxy_exp->info.srv_recvsize;
1271 			c->sendbuf = (char *)(c + 1);
1272 			c->recvbuf = c->sendbuf + c->sendbuf_sz;
1273 			c->slotid = i;
1274 			c->seqid = 0;
1275 			c->iodone = false;
1276 	
1277 			PTHREAD_MUTEX_lock(&pxy_exp->rpc.context_lock);
1278 			glist_add(&pxy_exp->rpc.free_contexts, &c->calls);
1279 			PTHREAD_MUTEX_unlock(&pxy_exp->rpc.context_lock);
1280 		}
1281 	
1282 		rc = pthread_create(&pxy_exp->rpc.pxy_recv_thread, NULL, pxy_rpc_recv,
1283 				    (void *)pxy_exp);
1284 		if (rc) {
1285 			LogCrit(COMPONENT_FSAL,
1286 				"Cannot create proxy rpc receiver thread - %s",
1287 				strerror(rc));
1288 			free_io_contexts(pxy_exp);
1289 			return rc;
1290 		}
1291 	
1292 		rc = pthread_create(&pxy_exp->rpc.pxy_renewer_thread, NULL,
1293 				    pxy_clientid_renewer, (void *)pxy_exp);
1294 		if (rc) {
1295 			LogCrit(COMPONENT_FSAL,
1296 				"Cannot create proxy clientid renewer thread - %s",
1297 				strerror(rc));
1298 			free_io_contexts(pxy_exp);
1299 		}
1300 		return rc;
1301 	}
1302 	
1303 	static fsal_status_t pxy_make_object(struct fsal_export *export,
1304 					     fattr4 *obj_attributes,
1305 					     const nfs_fh4 *fh,
1306 					     struct fsal_obj_handle **handle,
1307 					     struct attrlist *attrs_out)
1308 	{
1309 		struct pxy_obj_handle *pxy_hdl;
1310 	
1311 		pxy_hdl = pxy_alloc_handle(export, fh, obj_attributes,
1312 					   attrs_out);
1313 		if (pxy_hdl == NULL)
1314 			return fsalstat(ERR_FSAL_FAULT, 0);
1315 		*handle = &pxy_hdl->obj;
1316 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1317 	}
1318 	
1319 	/*
1320 	 * cap maxread and maxwrite config values to background server values
1321 	 */
1322 	static void pxy_check_maxread_maxwrite(struct fsal_export *export, fattr4 *f4)
1323 	{
1324 		fsal_dynamicfsinfo_t info;
1325 		int rc;
1326 	
1327 		rc = nfs4_Fattr_To_fsinfo(&info, f4);
1328 		if (rc != NFS4_OK) {
1329 			LogWarn(COMPONENT_FSAL,
1330 				"Unable to get maxread and maxwrite from background NFS server : %d",
1331 				rc);
1332 		} else {
1333 			struct pxy_fsal_module *pm =
1334 			    container_of(export->fsal, struct pxy_fsal_module, module);
1335 	
1336 			if (info.maxread != 0 &&
1337 				pm->module.fs_info.maxread > info.maxread) {
1338 				LogWarn(COMPONENT_FSAL,
1339 					"Reduced maxread from %"PRIu64
1340 					" to align with remote server %"PRIu64,
1341 					pm->module.fs_info.maxread, info.maxread);
1342 				pm->module.fs_info.maxread = info.maxread;
1343 			}
1344 	
1345 			if (info.maxwrite != 0 &&
1346 			 pm->module.fs_info.maxwrite > info.maxwrite) {
1347 				LogWarn(COMPONENT_FSAL,
1348 					"Reduced maxwrite from %"PRIu64
1349 					" to align with remote server %"PRIu64,
1350 					pm->module.fs_info.maxwrite, info.maxwrite);
1351 				pm->module.fs_info.maxwrite = info.maxwrite;
1352 			}
1353 		}
1354 	}
1355 	
1356 	/*
1357 	 * NULL parent pointer is only used by lookup_path when it starts
1358 	 * from the root handle and has its own export pointer, everybody
1359 	 * else is supposed to provide a real parent pointer and matching
1360 	 * export
1361 	 */
1362 	static fsal_status_t pxy_lookup_impl(struct fsal_obj_handle *parent,
1363 					     struct fsal_export *export,
1364 					     const struct user_cred *cred,
1365 					     const char *path,
1366 					     struct fsal_obj_handle **handle,
1367 					     struct attrlist *attrs_out)
1368 	{
1369 		int rc;
1370 		uint32_t opcnt = 0;
1371 		GETATTR4resok *atok;
1372 		GETATTR4resok *atok_per_file_system_attr = NULL;
1373 		GETFH4resok *fhok;
1374 		sessionid4 sid;
1375 		/* SEQUENCE PUTROOTFH/PUTFH LOOKUP GETFH GETATTR (GETATTR) */
1376 	#define FSAL_LOOKUP_NB_OP_ALLOC 6
1377 		nfs_argop4 argoparray[FSAL_LOOKUP_NB_OP_ALLOC];
1378 		nfs_resop4 resoparray[FSAL_LOOKUP_NB_OP_ALLOC];
1379 		char fattr_blob[FATTR_BLOB_SZ];
1380 		char fattr_blob_per_file_system_attr[FATTR_BLOB_SZ];
1381 		char padfilehandle[NFS4_FHSIZE];
1382 	
1383 		if (!handle)
1384 			return fsalstat(ERR_FSAL_INVAL, 0);
1385 	
1386 		/* SEQUENCE */
1387 		pxy_get_client_sessionid(sid);
1388 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1389 		if (!parent) {
1390 			COMPOUNDV4_ARG_ADD_OP_PUTROOTFH(opcnt, argoparray);
1391 		} else {
1392 			struct pxy_obj_handle *pxy_obj =
1393 			    container_of(parent, struct pxy_obj_handle, obj);
1394 			switch (parent->type) {
1395 			case DIRECTORY:
1396 				break;
1397 	
1398 			default:
1399 				return fsalstat(ERR_FSAL_NOTDIR, 0);
1400 			}
1401 	
1402 			COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, pxy_obj->fh4);
1403 		}
1404 	
1405 		if (path) {
1406 			if (!strcmp(path, ".")) {
1407 				if (!parent)
1408 					return fsalstat(ERR_FSAL_FAULT, 0);
1409 			} else if (!strcmp(path, "..")) {
1410 				if (!parent)
1411 					return fsalstat(ERR_FSAL_FAULT, 0);
1412 				COMPOUNDV4_ARG_ADD_OP_LOOKUPP(opcnt, argoparray);
1413 			} else {
1414 				COMPOUNDV4_ARG_ADD_OP_LOOKUP(opcnt, argoparray, path);
1415 			}
1416 		}
1417 	
1418 		fhok = &resoparray[opcnt].nfs_resop4_u.opgetfh.GETFH4res_u.resok4;
1419 		COMPOUNDV4_ARG_ADD_OP_GETFH(opcnt, argoparray);
1420 	
1421 		atok =
1422 		    pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
1423 					   sizeof(fattr_blob));
1424 	
1425 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_getattr);
1426 	
1427 		/* Dynamic ask of server per file system attr */
1428 		if (!parent) {
1429 			atok_per_file_system_attr =
1430 			    pxy_fill_getattr_reply(resoparray + opcnt,
1431 					fattr_blob_per_file_system_attr,
1432 					sizeof(fattr_blob_per_file_system_attr));
1433 			COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray,
1434 						      pxy_bitmap_per_file_system_attr);
1435 		}
1436 		fhok->object.nfs_fh4_val = (char *)padfilehandle;
1437 		fhok->object.nfs_fh4_len = sizeof(padfilehandle);
1438 	
1439 		rc = pxy_nfsv4_call(cred, opcnt, argoparray, resoparray);
1440 		if (rc != NFS4_OK)
1441 			return nfsstat4_to_fsal(rc);
1442 	
1443 		/* Dynamic check of server per file system attr */
1444 		if (!parent) {
1445 			/* maxread and maxwrite */
1446 			pxy_check_maxread_maxwrite(export,
1447 					&atok_per_file_system_attr->obj_attributes);
1448 		}
1449 	
1450 		return pxy_make_object(export, &atok->obj_attributes, &fhok->object,
1451 				       handle, attrs_out);
1452 	}
1453 	
1454 	static fsal_status_t pxy_lookup(struct fsal_obj_handle *parent,
1455 					const char *path,
1456 					struct fsal_obj_handle **handle,
1457 					struct attrlist *attrs_out)
1458 	{
1459 		return pxy_lookup_impl(parent, op_ctx->fsal_export,
1460 				       op_ctx->creds, path, handle, attrs_out);
1461 	}
1462 	
1463 	/* TODO: make this per-export */
1464 	static uint64_t fcnt;
1465 	
1466 	static fsal_status_t pxy_mkdir(struct fsal_obj_handle *dir_hdl,
1467 				       const char *name, struct attrlist *attrib,
1468 				       struct fsal_obj_handle **handle,
1469 				       struct attrlist *attrs_out)
1470 	{
1471 		int rc;
1472 		int opcnt = 0;
1473 		fattr4 input_attr;
1474 		char padfilehandle[NFS4_FHSIZE];
1475 		struct pxy_obj_handle *ph;
1476 		char fattr_blob[FATTR_BLOB_SZ];
1477 		GETATTR4resok *atok;
1478 		GETFH4resok *fhok;
1479 		fsal_status_t st;
1480 		sessionid4 sid;
1481 	#define FSAL_MKDIR_NB_OP_ALLOC 5 /* SEQUENCE PUTFH CREATE GETFH GETATTR */
1482 		nfs_argop4 argoparray[FSAL_MKDIR_NB_OP_ALLOC];
1483 		nfs_resop4 resoparray[FSAL_MKDIR_NB_OP_ALLOC];
1484 	
1485 		/*
1486 		 * The caller gives us partial attributes which include mode and owner
1487 		 * and expects the full attributes back at the end of the call.
1488 		 */
1489 		attrib->valid_mask &= ATTR_MODE | ATTR_OWNER | ATTR_GROUP;
1490 		if (pxy_fsalattr_to_fattr4(attrib, &input_attr) == -1)
1491 			return fsalstat(ERR_FSAL_INVAL, -1);
1492 	
1493 		ph = container_of(dir_hdl, struct pxy_obj_handle, obj);
1494 		/* SEQUENCE */
1495 		pxy_get_client_sessionid(sid);
1496 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1497 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
1498 	
1499 		resoparray[opcnt].nfs_resop4_u.opcreate.CREATE4res_u.resok4.attrset =
1500 		    empty_bitmap;
1501 		COMPOUNDV4_ARG_ADD_OP_MKDIR(opcnt, argoparray, (char *)name,
1502 					    input_attr);
1503 	
1504 		fhok = &resoparray[opcnt].nfs_resop4_u.opgetfh.GETFH4res_u.resok4;
1505 		fhok->object.nfs_fh4_val = padfilehandle;
1506 		fhok->object.nfs_fh4_len = sizeof(padfilehandle);
1507 		COMPOUNDV4_ARG_ADD_OP_GETFH(opcnt, argoparray);
1508 	
1509 		atok =
1510 		    pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
1511 					   sizeof(fattr_blob));
1512 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_getattr);
1513 	
1514 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1515 		nfs4_Fattr_Free(&input_attr);
1516 		if (rc != NFS4_OK)
1517 			return nfsstat4_to_fsal(rc);
1518 	
1519 		st = pxy_make_object(op_ctx->fsal_export, &atok->obj_attributes,
1520 				     &fhok->object, handle, attrs_out);
1521 		if (FSAL_IS_ERROR(st))
1522 			return st;
1523 	
1524 		return (*handle)->obj_ops->getattrs(*handle, attrib);
1525 	}
1526 	
1527 	static fsal_status_t pxy_mknod(struct fsal_obj_handle *dir_hdl,
1528 				       const char *name, object_file_type_t nodetype,
1529 				       struct attrlist *attrib,
1530 				       struct fsal_obj_handle **handle,
1531 				       struct attrlist *attrs_out)
1532 	{
1533 		int rc;
1534 		int opcnt = 0;
1535 		fattr4 input_attr;
1536 		char padfilehandle[NFS4_FHSIZE];
1537 		struct pxy_obj_handle *ph;
1538 		char fattr_blob[FATTR_BLOB_SZ];
1539 		GETATTR4resok *atok;
1540 		GETFH4resok *fhok;
1541 		fsal_status_t st;
1542 		enum nfs_ftype4 nf4type;
1543 		specdata4 specdata = { 0, 0 };
1544 		sessionid4 sid;
1545 	#define FSAL_MKNOD_NB_OP_ALLOC 5 /* SEQUENCE PUTFH CREATE GETFH GETATTR */
1546 		nfs_argop4 argoparray[FSAL_MKNOD_NB_OP_ALLOC];
1547 		nfs_resop4 resoparray[FSAL_MKNOD_NB_OP_ALLOC];
1548 	
1549 		switch (nodetype) {
1550 		case CHARACTER_FILE:
1551 			specdata.specdata1 = attrib->rawdev.major;
1552 			specdata.specdata2 = attrib->rawdev.minor;
1553 			nf4type = NF4CHR;
1554 			break;
1555 		case BLOCK_FILE:
1556 			specdata.specdata1 = attrib->rawdev.major;
1557 			specdata.specdata2 = attrib->rawdev.minor;
1558 			nf4type = NF4BLK;
1559 			break;
1560 		case SOCKET_FILE:
1561 			nf4type = NF4SOCK;
1562 			break;
1563 		case FIFO_FILE:
1564 			nf4type = NF4FIFO;
1565 			break;
1566 		default:
1567 			return fsalstat(ERR_FSAL_FAULT, EINVAL);
1568 		}
1569 	
1570 		/*
1571 		 * The caller gives us partial attributes which include mode and owner
1572 		 * and expects the full attributes back at the end of the call.
1573 		 */
1574 		attrib->valid_mask &= ATTR_MODE | ATTR_OWNER | ATTR_GROUP;
1575 		if (pxy_fsalattr_to_fattr4(attrib, &input_attr) == -1)
1576 			return fsalstat(ERR_FSAL_INVAL, -1);
1577 	
1578 		ph = container_of(dir_hdl, struct pxy_obj_handle, obj);
1579 		/* SEQUENCE */
1580 		pxy_get_client_sessionid(sid);
1581 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1582 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
1583 	
1584 		resoparray[opcnt].nfs_resop4_u.opcreate.CREATE4res_u.resok4.attrset =
1585 		    empty_bitmap;
1586 		COMPOUNDV4_ARG_ADD_OP_CREATE(opcnt, argoparray, (char *)name, nf4type,
1587 					     input_attr, specdata);
1588 	
1589 		fhok = &resoparray[opcnt].nfs_resop4_u.opgetfh.GETFH4res_u.resok4;
1590 		fhok->object.nfs_fh4_val = padfilehandle;
1591 		fhok->object.nfs_fh4_len = sizeof(padfilehandle);
1592 		COMPOUNDV4_ARG_ADD_OP_GETFH(opcnt, argoparray);
1593 	
1594 		atok =
1595 		    pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
1596 					   sizeof(fattr_blob));
1597 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_getattr);
1598 	
1599 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1600 		nfs4_Fattr_Free(&input_attr);
1601 		if (rc != NFS4_OK)
1602 			return nfsstat4_to_fsal(rc);
1603 	
1604 		st = pxy_make_object(op_ctx->fsal_export, &atok->obj_attributes,
1605 				     &fhok->object, handle, attrs_out);
1606 		if (FSAL_IS_ERROR(st))
1607 			return st;
1608 	
1609 		return (*handle)->obj_ops->getattrs(*handle, attrib);
1610 	}
1611 	
1612 	static fsal_status_t pxy_symlink(struct fsal_obj_handle *dir_hdl,
1613 					 const char *name, const char *link_path,
1614 					 struct attrlist *attrib,
1615 					 struct fsal_obj_handle **handle,
1616 					 struct attrlist *attrs_out)
1617 	{
1618 		int rc;
1619 		int opcnt = 0;
1620 		fattr4 input_attr;
1621 		char padfilehandle[NFS4_FHSIZE];
1622 		char fattr_blob[FATTR_BLOB_SZ];
1623 		sessionid4 sid;
1624 	#define FSAL_SYMLINK_NB_OP_ALLOC 5 /* SEQUENCE PUTFH CREATE GETFH GETATTR */
1625 		nfs_argop4 argoparray[FSAL_SYMLINK_NB_OP_ALLOC];
1626 		nfs_resop4 resoparray[FSAL_SYMLINK_NB_OP_ALLOC];
1627 		GETATTR4resok *atok;
1628 		GETFH4resok *fhok;
1629 		fsal_status_t st;
1630 		struct pxy_obj_handle *ph;
1631 	
1632 		/* Tests if symlinking is allowed by configuration. */
1633 		if (!op_ctx->fsal_export->exp_ops.fs_supports(op_ctx->fsal_export,
1634 							  fso_symlink_support))
1635 			return fsalstat(ERR_FSAL_NOTSUPP, ENOTSUP);
1636 	
1637 		attrib->valid_mask = ATTR_MODE;
1638 		if (pxy_fsalattr_to_fattr4(attrib, &input_attr) == -1)
1639 			return fsalstat(ERR_FSAL_INVAL, -1);
1640 	
1641 		ph = container_of(dir_hdl, struct pxy_obj_handle, obj);
1642 		/* SEQUENCE */
1643 		pxy_get_client_sessionid(sid);
1644 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1645 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
1646 	
1647 		resoparray[opcnt].nfs_resop4_u.opcreate.CREATE4res_u.resok4.attrset =
1648 		    empty_bitmap;
1649 		COMPOUNDV4_ARG_ADD_OP_SYMLINK(opcnt, argoparray, (char *)name,
1650 					      (char *)link_path, input_attr);
1651 	
1652 		fhok = &resoparray[opcnt].nfs_resop4_u.opgetfh.GETFH4res_u.resok4;
1653 		fhok->object.nfs_fh4_val = padfilehandle;
1654 		fhok->object.nfs_fh4_len = sizeof(padfilehandle);
1655 		COMPOUNDV4_ARG_ADD_OP_GETFH(opcnt, argoparray);
1656 	
1657 		atok =
1658 		    pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
1659 					   sizeof(fattr_blob));
1660 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_getattr);
1661 	
1662 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1663 		nfs4_Fattr_Free(&input_attr);
1664 		if (rc != NFS4_OK)
1665 			return nfsstat4_to_fsal(rc);
1666 	
1667 		st = pxy_make_object(op_ctx->fsal_export, &atok->obj_attributes,
1668 				     &fhok->object, handle, attrs_out);
1669 		if (FSAL_IS_ERROR(st))
1670 			return st;
1671 	
1672 		return (*handle)->obj_ops->getattrs(*handle, attrib);
1673 	}
1674 	
1675 	static fsal_status_t pxy_readlink(struct fsal_obj_handle *obj_hdl,
1676 					  struct gsh_buffdesc *link_content,
1677 					  bool refresh)
1678 	{
1679 		int rc;
1680 		int opcnt = 0;
1681 		struct pxy_obj_handle *ph;
1682 		sessionid4 sid;
1683 	#define FSAL_READLINK_NB_OP_ALLOC 3 /* SEQUENCE PUTFH READLINK */
1684 		nfs_argop4 argoparray[FSAL_READLINK_NB_OP_ALLOC];
1685 		nfs_resop4 resoparray[FSAL_READLINK_NB_OP_ALLOC];
1686 		READLINK4resok *rlok;
1687 	
1688 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
1689 		/* SEQUENCE */
1690 		pxy_get_client_sessionid(sid);
1691 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1692 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
1693 	
1694 		/* This saves us from having to do one allocation for the XDR,
1695 		   another allocation for the return, and a copy just to get
1696 		   the \NUL terminator. The link length should be cached in
1697 		   the file handle. */
1698 	
1699 		link_content->len = fsal_default_linksize;
1700 		link_content->addr = gsh_malloc(link_content->len);
1701 	
1702 		rlok = &resoparray[opcnt].nfs_resop4_u.opreadlink.READLINK4res_u.resok4;
1703 		rlok->link.utf8string_val = link_content->addr;
1704 		rlok->link.utf8string_len = link_content->len;
1705 		COMPOUNDV4_ARG_ADD_OP_READLINK(opcnt, argoparray);
1706 	
1707 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1708 		if (rc != NFS4_OK) {
1709 			gsh_free(link_content->addr);
1710 			link_content->addr = NULL;
1711 			link_content->len = 0;
1712 			return nfsstat4_to_fsal(rc);
1713 		}
1714 	
1715 		rlok->link.utf8string_val[rlok->link.utf8string_len] = '\0';
1716 		link_content->len = rlok->link.utf8string_len + 1;
1717 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1718 	}
1719 	
1720 	static fsal_status_t pxy_link(struct fsal_obj_handle *obj_hdl,
1721 				      struct fsal_obj_handle *destdir_hdl,
1722 				      const char *name)
1723 	{
1724 		int rc;
1725 		struct pxy_obj_handle *tgt;
1726 		struct pxy_obj_handle *dst;
1727 		sessionid4 sid;
1728 	#define FSAL_LINK_NB_OP_ALLOC 5 /* SEQUENCE PUTFH SAVEFH PUTFH LINK */
1729 		nfs_argop4 argoparray[FSAL_LINK_NB_OP_ALLOC];
1730 		nfs_resop4 resoparray[FSAL_LINK_NB_OP_ALLOC];
1731 		int opcnt = 0;
1732 	
1733 		/* Tests if hardlinking is allowed by configuration. */
1734 		if (!op_ctx->fsal_export->exp_ops.fs_supports(op_ctx->fsal_export,
1735 							  fso_link_support))
1736 			return fsalstat(ERR_FSAL_NOTSUPP, ENOTSUP);
1737 	
1738 		tgt = container_of(obj_hdl, struct pxy_obj_handle, obj);
1739 		dst = container_of(destdir_hdl, struct pxy_obj_handle, obj);
1740 	
1741 		/* SEQUENCE */
1742 		pxy_get_client_sessionid(sid);
1743 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1744 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, tgt->fh4);
1745 		COMPOUNDV4_ARG_ADD_OP_SAVEFH(opcnt, argoparray);
1746 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, dst->fh4);
1747 		COMPOUNDV4_ARG_ADD_OP_LINK(opcnt, argoparray, (char *)name);
1748 	
1749 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1750 		return nfsstat4_to_fsal(rc);
1751 	}
1752 	
1753 	#define FSAL_READDIR_NB_OP_ALLOC 3 /* SEQUENCE PUTFH READDIR */
1754 	static bool xdr_readdirres(XDR *x, nfs_resop4 *rdres)
1755 	{
1756 		int i;
1757 		int res = true;
1758 	
1759 		for (i = 0; i < FSAL_READDIR_NB_OP_ALLOC; i++) {
1760 			res  = xdr_nfs_resop4(x, rdres + i);
1761 			if (res != true)
1762 				return res;
1763 		}
1764 	
1765 		return res;
1766 	}
1767 	
1768 	/*
1769 	 * Trying to guess how many entries can fit into a readdir buffer
1770 	 * is complicated and usually results in either gross over-allocation
1771 	 * of the memory for results or under-allocation (on large directories)
1772 	 * and buffer overruns - just pay the price of allocating the memory
1773 	 * inside XDR decoding and free it when done
1774 	 */
1775 	static fsal_status_t pxy_do_readdir(struct pxy_obj_handle *ph,
1776 					    nfs_cookie4 *cookie, fsal_readdir_cb cb,
1777 					    void *cbarg, attrmask_t attrmask,
1778 					    bool *eof, bool *again)
1779 	{
1780 		uint32_t opcnt = 0;
1781 		int rc;
1782 		entry4 *e4;
1783 		sessionid4 sid;
1784 		nfs_argop4 argoparray[FSAL_READDIR_NB_OP_ALLOC];
1785 		nfs_resop4 resoparray[FSAL_READDIR_NB_OP_ALLOC];
1786 		READDIR4resok *rdok;
1787 		fsal_status_t st = { ERR_FSAL_NO_ERROR, 0 };
1788 	
1789 		/* SEQUENCE */
1790 		pxy_get_client_sessionid(sid);
1791 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1792 		/* PUTFH */
1793 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
1794 		rdok = &resoparray[opcnt].nfs_resop4_u.opreaddir.READDIR4res_u.resok4;
1795 		rdok->reply.entries = NULL;
1796 		/* READDIR */
1797 		COMPOUNDV4_ARG_ADD_OP_READDIR(opcnt, argoparray, *cookie,
1798 					      pxy_bitmap_getattr);
1799 	
1800 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1801 		if (rc != NFS4_OK)
1802 			return nfsstat4_to_fsal(rc);
1803 	
1804 		*eof = rdok->reply.eof;
1805 	
1806 		for (e4 = rdok->reply.entries; e4; e4 = e4->nextentry) {
1807 			struct attrlist attrs;
1808 			char name[MAXNAMLEN + 1];
1809 			struct fsal_obj_handle *handle;
1810 			enum fsal_dir_result cb_rc;
1811 	
1812 			/* UTF8 name does not include trailing 0 */
1813 			if (e4->name.utf8string_len > sizeof(name) - 1)
1814 				return fsalstat(ERR_FSAL_SERVERFAULT, E2BIG);
1815 			memcpy(name, e4->name.utf8string_val, e4->name.utf8string_len);
1816 			name[e4->name.utf8string_len] = '\0';
1817 	
1818 			if (nfs4_Fattr_To_FSAL_attr(&attrs, &e4->attrs, NULL))
1819 				return fsalstat(ERR_FSAL_FAULT, 0);
1820 	
1821 			/*
1822 			 *  If *again==false : we are in readahead,
1823 			 *  we only call cb for next entries but don't update result
1824 			 *  for calling readdir.
1825 			 */
1826 			if (*again) {
1827 				*cookie = e4->cookie;
1828 				*eof = rdok->reply.eof && !e4->nextentry;
1829 			}
1830 	
1831 			/** @todo FSF: this could be handled by getting handle as part
1832 			 *             of readdir attributes. However, if acl is
1833 			 *             requested, we might get it separately to
1834 			 *             avoid over large READDIR response.
1835 			 */
1836 			st = pxy_lookup_impl(&ph->obj, op_ctx->fsal_export,
1837 					     op_ctx->creds, name, &handle, NULL);
1838 			if (FSAL_IS_ERROR(st))
1839 				break;
1840 	
1841 			cb_rc = cb(name, handle, &attrs, cbarg, e4->cookie);
1842 	
1843 			fsal_release_attrs(&attrs);
1844 	
1845 			if (cb_rc >= DIR_TERMINATE) {
1846 				*again = false;
1847 				break;
1848 			}
1849 			/*
1850 			 * Read ahead is supported by this FSAL
1851 			 * but limited to the current background readdir request.
1852 			 */
1853 			if (cb_rc >= DIR_READAHEAD && *again) {
1854 				*again = false;
1855 			}
1856 		}
1857 		xdr_free((xdrproc_t) xdr_readdirres, resoparray);
1858 		return st;
1859 	}
1860 	
1861 	/**
1862 	 *  @todo We might add a verifier to the cookie provided
1863 	 *        if server needs one ...
1864 	 */
1865 	static fsal_status_t pxy_readdir(struct fsal_obj_handle *dir_hdl,
1866 					 fsal_cookie_t *whence, void *cbarg,
1867 					 fsal_readdir_cb cb, attrmask_t attrmask,
1868 					 bool *eof)
1869 	{
1870 		nfs_cookie4 cookie = 0;
1871 		struct pxy_obj_handle *ph;
1872 		bool again = true;
1873 	
1874 		if (whence)
1875 			cookie = (nfs_cookie4) *whence;
1876 	
1877 		ph = container_of(dir_hdl, struct pxy_obj_handle, obj);
1878 	
1879 		do {
1880 			fsal_status_t st;
1881 	
1882 			st = pxy_do_readdir(ph, &cookie, cb, cbarg, attrmask, eof,
1883 					    &again);
1884 			if (FSAL_IS_ERROR(st))
1885 				return st;
1886 		} while (*eof == false && again);
1887 	
1888 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1889 	}
1890 	
1891 	static fsal_status_t pxy_rename(struct fsal_obj_handle *obj_hdl,
1892 					struct fsal_obj_handle *olddir_hdl,
1893 					const char *old_name,
1894 					struct fsal_obj_handle *newdir_hdl,
1895 					const char *new_name)
1896 	{
1897 		int rc;
1898 		int opcnt = 0;
1899 		sessionid4 sid;
1900 	#define FSAL_RENAME_NB_OP_ALLOC 5 /* SEQUENCE PUTFH SAVEFH PUTFH RENAME */
1901 		nfs_argop4 argoparray[FSAL_RENAME_NB_OP_ALLOC];
1902 		nfs_resop4 resoparray[FSAL_RENAME_NB_OP_ALLOC];
1903 		struct pxy_obj_handle *src;
1904 		struct pxy_obj_handle *tgt;
1905 	
1906 		src = container_of(olddir_hdl, struct pxy_obj_handle, obj);
1907 		tgt = container_of(newdir_hdl, struct pxy_obj_handle, obj);
1908 		/* SEQUENCE */
1909 		pxy_get_client_sessionid(sid);
1910 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1911 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, src->fh4);
1912 		COMPOUNDV4_ARG_ADD_OP_SAVEFH(opcnt, argoparray);
1913 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, tgt->fh4);
1914 		COMPOUNDV4_ARG_ADD_OP_RENAME(opcnt, argoparray, (char *)old_name,
1915 					     (char *)new_name);
1916 	
1917 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1918 		return nfsstat4_to_fsal(rc);
1919 	}
1920 	
1921 	static inline int nfs4_Fattr_To_FSAL_attr_savreqmask(struct attrlist *FSAL_attr,
1922 							     fattr4 *Fattr,
1923 							     compound_data_t *data)
1924 	{
1925 		int rc = 0;
1926 		attrmask_t saved_request_mask = FSAL_attr->request_mask;
1927 	
1928 		rc = nfs4_Fattr_To_FSAL_attr(FSAL_attr, Fattr, data);
1929 		FSAL_attr->request_mask = saved_request_mask;
1930 		return rc;
1931 	}
1932 	
1933 	static fsal_status_t pxy_getattrs(struct fsal_obj_handle *obj_hdl,
1934 					  struct attrlist *attrs)
1935 	{
1936 		struct pxy_obj_handle *ph;
1937 		int rc;
1938 		uint32_t opcnt = 0;
1939 		sessionid4 sid;
1940 	#define FSAL_GETATTR_NB_OP_ALLOC 3 /* SEQUENCE PUTFH GETATTR */
1941 		nfs_argop4 argoparray[FSAL_GETATTR_NB_OP_ALLOC];
1942 		nfs_resop4 resoparray[FSAL_GETATTR_NB_OP_ALLOC];
1943 		GETATTR4resok *atok;
1944 		char fattr_blob[FATTR_BLOB_SZ];
1945 	
1946 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
1947 	
1948 		/* SEQUENCE */
1949 		pxy_get_client_sessionid(sid);
1950 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1951 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
1952 	
1953 		atok = pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
1954 					      sizeof(fattr_blob));
1955 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_getattr);
1956 	
1957 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
1958 	
1959 		if (rc != NFS4_OK) {
1960 			if (attrs->request_mask & ATTR_RDATTR_ERR) {
1961 				/* Caller asked for error to be visible. */
1962 				attrs->valid_mask = ATTR_RDATTR_ERR;
1963 			}
1964 			return nfsstat4_to_fsal(rc);
1965 		}
1966 	
1967 		if (nfs4_Fattr_To_FSAL_attr_savreqmask(attrs, &atok->obj_attributes,
1968 						       NULL) != NFS4_OK)
1969 			return fsalstat(ERR_FSAL_INVAL, 0);
1970 	
1971 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1972 	}
1973 	
1974 	static fsal_status_t pxy_unlink(struct fsal_obj_handle *dir_hdl,
1975 					struct fsal_obj_handle *obj_hdl,
1976 					const char *name)
1977 	{
1978 		int opcnt = 0;
1979 		int rc;
1980 		struct pxy_obj_handle *ph;
1981 		sessionid4 sid;
1982 	#define FSAL_UNLINK_NB_OP_ALLOC 4 /* SEQUENCE PUTFH REMOVE GETATTR */
1983 		nfs_argop4 argoparray[FSAL_UNLINK_NB_OP_ALLOC];
1984 		nfs_resop4 resoparray[FSAL_UNLINK_NB_OP_ALLOC];
1985 	#if GETATTR_AFTER
1986 		GETATTR4resok *atok;
1987 		char fattr_blob[FATTR_BLOB_SZ];
1988 		struct attrlist dirattr;
1989 	#endif
1990 	
1991 		ph = container_of(dir_hdl, struct pxy_obj_handle, obj);
1992 		/* SEQUENCE */
1993 		pxy_get_client_sessionid(sid);
1994 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
1995 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
1996 		COMPOUNDV4_ARG_ADD_OP_REMOVE(opcnt, argoparray, (char *)name);
1997 	
1998 	#if GETATTR_AFTER
1999 		atok =
2000 		    pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
2001 					   sizeof(fattr_blob));
2002 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_getattr);
2003 	#endif
2004 	
2005 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
2006 		if (rc != NFS4_OK)
2007 			return nfsstat4_to_fsal(rc);
2008 	
2009 	#if GETATTR_AFTER
2010 		if (nfs4_Fattr_To_FSAL_attr(&dirattr, &atok->obj_attributes, NULL) ==
2011 		    NFS4_OK)
2012 			ph->attributes = dirattr;
2013 	#endif
2014 	
2015 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2016 	}
2017 	
2018 	static fsal_status_t pxy_handle_to_wire(const struct fsal_obj_handle *obj_hdl,
2019 						fsal_digesttype_t output_type,
2020 						struct gsh_buffdesc *fh_desc)
2021 	{
2022 		struct pxy_obj_handle *ph =
2023 		    container_of(obj_hdl, struct pxy_obj_handle, obj);
2024 		size_t fhs;
2025 		void *data;
2026 	
2027 		/* sanity checks */
2028 		if (!fh_desc || !fh_desc->addr)
2029 			return fsalstat(ERR_FSAL_FAULT, 0);
2030 	
2031 		switch (output_type) {
2032 		case FSAL_DIGEST_NFSV3:
2033 	#ifdef PROXY_HANDLE_MAPPING
2034 			fhs = sizeof(ph->h23);
2035 			data = &ph->h23;
2036 			break;
2037 	#endif
2038 		case FSAL_DIGEST_NFSV4:
2039 			fhs = ph->blob.len;
2040 			data = &ph->blob;
2041 			break;
2042 		default:
2043 			return fsalstat(ERR_FSAL_SERVERFAULT, 0);
2044 		}
2045 	
2046 		if (fh_desc->len < fhs)
2047 			return fsalstat(ERR_FSAL_TOOSMALL, 0);
2048 		memcpy(fh_desc->addr, data, fhs);
2049 		fh_desc->len = fhs;
2050 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2051 	}
2052 	
2053 	static void pxy_handle_to_key(struct fsal_obj_handle *obj_hdl,
2054 				      struct gsh_buffdesc *fh_desc)
2055 	{
2056 		struct pxy_obj_handle *ph =
2057 		    container_of(obj_hdl, struct pxy_obj_handle, obj);
2058 		fh_desc->addr = &ph->blob;
2059 		fh_desc->len = ph->blob.len;
2060 	}
2061 	
2062 	static void pxy_hdl_release(struct fsal_obj_handle *obj_hdl)
2063 	{
2064 		struct pxy_obj_handle *ph =
2065 		    container_of(obj_hdl, struct pxy_obj_handle, obj);
2066 	
2067 		fsal_obj_handle_fini(obj_hdl);
2068 	
2069 		gsh_free(ph);
2070 	}
2071 	
2072 	/*
2073 	 * In this first FSAL_PROXY support_ex version without state
2074 	 * nothing to do on close.
2075 	 */
2076 	static fsal_status_t pxy_close(struct fsal_obj_handle *obj_hdl)
2077 	{
2078 		return fsalstat(ERR_FSAL_NOT_OPENED, 0);
2079 	}
2080 	
2081 	/*
2082 	 * support_ex methods
2083 	 *
2084 	 * This first dirty version of support_ex in FSAL_PROXY doesn't take care of
2085 	 * any state.
2086 	 *
2087 	 * The goal achieves by this first dirty version is only to be compliant with
2088 	 * support_ex fsal api.
2089 	 */
2090 	
2091 	/**
2092 	 * @brief Fill share_access and share_deny fields of an OPEN4args struct
2093 	 *
2094 	 * This function fills share_access and share_deny fields of an OPEN4args
2095 	 * struct to prepare an OPEN v4.1 call.
2096 	 *
2097 	 * @param[in]     openflags      fsal open flags
2098 	 * @param[in,out] share_access   share_access field to be filled
2099 	 * @param[in,out] share_deny     share_deny field to be filled
2100 	 *
2101 	 * @return FSAL status.
2102 	 */
2103 	static fsal_status_t  fill_share_OPEN4args(uint32_t *share_access,
2104 						   uint32_t *share_deny,
2105 						   fsal_openflags_t openflags)
2106 	{
2107 	
2108 		/* share access */
2109 		*share_access = 0;
2110 		if (openflags & FSAL_O_READ)
2111 			*share_access |= OPEN4_SHARE_ACCESS_READ;
2112 		if (openflags & FSAL_O_WRITE)
2113 			*share_access |= OPEN4_SHARE_ACCESS_WRITE;
2114 		/* share write */
2115 		*share_deny = OPEN4_SHARE_DENY_NONE;
2116 		if (openflags & FSAL_O_DENY_READ)
2117 			*share_deny |= OPEN4_SHARE_DENY_READ;
2118 		if (openflags & FSAL_O_DENY_WRITE ||
2119 		    openflags & FSAL_O_DENY_WRITE_MAND)
2120 			*share_deny |= OPEN4_SHARE_DENY_WRITE;
2121 	
2122 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2123 	}
2124 	
2125 	/**
2126 	 * @brief Fill openhow field of an OPEN4args struct
2127 	 *
2128 	 * This function fills an openflags4 openhow field of an OPEN4args struct
2129 	 * to prepare an OPEN v4.1 call.
2130 	 *
2131 	 * @param[in]     attrs_in       open atributes
2132 	 * @param[in]     create_mode    open create mode
2133 	 * @param[in]     verifier       open verifier
2134 	 * @param[in,out] openhow        openhow field to be filled
2135 	 * @param[in,out] inattrs        created inattrs (need to be freed by calling
2136 	 *                               nfs4_Fattr_Free)
2137 	 * @param[out]    setattr_needed an additional setattr op is needed
2138 	 *
2139 	 * @return FSAL status.
2140 	 */
2141 	static fsal_status_t fill_openhow_OPEN4args(openflag4 *openhow,
2142 						    fattr4 inattrs,
2143 						    enum fsal_create_mode createmode,
2144 						    fsal_verifier_t verifier,
2145 						    bool *setattr_needed,
2146 						    const char *name,
2147 						    fsal_openflags_t openflags)
2148 	{
2149 		if (openhow == NULL)
2150 			return fsalstat(ERR_FSAL_INVAL, -1);
2151 	
2152 		/* openhow */
2153 		/* not an open by handle and flag create */
2154 		if (name && createmode != FSAL_NO_CREATE) {
2155 			createhow4 *how = &(openhow->openflag4_u.how);
2156 	
2157 			openhow->opentype = OPEN4_CREATE;
2158 			switch (createmode) {
2159 			case FSAL_UNCHECKED:
2160 				how->mode = UNCHECKED4;
2161 				how->createhow4_u.createattrs = inattrs;
2162 				break;
2163 			case FSAL_GUARDED:
2164 			case FSAL_EXCLUSIVE_9P:
2165 				how->mode = GUARDED4;
2166 				how->createhow4_u.createattrs = inattrs;
2167 				break;
2168 			case FSAL_EXCLUSIVE:
2169 				how->mode = EXCLUSIVE4;
2170 				FSAL_VERIFIER_T_TO_VERIFIER4(
2171 					how->createhow4_u.createverf, verifier);
2172 				/* no way to set attr in same op than old EXCLUSIVE4 */
2173 				if (inattrs.attrmask.bitmap4_len > 0) {
2174 					int i = 0;
2175 	
2176 					for (i = 0; i < inattrs.attrmask.bitmap4_len;
2177 					     i++) {
2178 						if (inattrs.attrmask.map[i]) {
2179 							*setattr_needed = true;
2180 							break;
2181 						}
2182 					}
2183 				}
2184 				break;
2185 			case FSAL_EXCLUSIVE_41:
2186 				how->mode = EXCLUSIVE4_1;
2187 				FSAL_VERIFIER_T_TO_VERIFIER4(
2188 					how->createhow4_u.ch_createboth.cva_verf,
2189 					verifier);
2190 				how->createhow4_u.ch_createboth.cva_attrs = inattrs;
2191 			/*
2192 			 * We assume verifier is stored in time metadata.
2193 			 *
2194 			 * We had better check suppattr_exclcreat from background
2195 			 * server.
2196 			 */
2197 				if (inattrs.attrmask.bitmap4_len >= 2 &&
2198 		inattrs.attrmask.map[1] & (1U << (FATTR4_TIME_METADATA - 32))) {
2199 					*setattr_needed = true;
2200 			how->createhow4_u.ch_createboth.cva_attrs.attrmask.map[1] &=
2201 						~(1U << (FATTR4_TIME_METADATA - 32));
2202 				}
2203 				break;
2204 			default:
2205 				return fsalstat(ERR_FSAL_FAULT,
2206 						EINVAL);
2207 			}
2208 		} else
2209 			openhow->opentype = OPEN4_NOCREATE;
2210 	
2211 		/* include open by handle and TRUNCATE case in setattr_needed */
2212 		if (!name && openflags & FSAL_O_TRUNC)
2213 			*setattr_needed = true;
2214 	
2215 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2216 	}
2217 	
2218 	static fsal_status_t pxy_open2(struct fsal_obj_handle *obj_hdl,
2219 				       struct state_t *state,
2220 				       fsal_openflags_t openflags,
2221 				       enum fsal_create_mode createmode,
2222 				       const char *name,
2223 				       struct attrlist *attrs_in,
2224 				       fsal_verifier_t verifier,
2225 				       struct fsal_obj_handle **new_obj,
2226 				       struct attrlist *attrs_out,
2227 				       bool *caller_perm_check)
2228 	{
2229 		struct pxy_obj_handle *ph;
2230 		int rc; /* return code of nfs call */
2231 		int opcnt = 0; /* nfs arg counter */
2232 		fsal_status_t st; /* return code of fsal call */
2233 		char padfilehandle[NFS4_FHSIZE]; /* gotten FH */
2234 		char owner_val[128];
2235 		unsigned int owner_len = 0;
2236 		uint32_t share_access = 0;
2237 		uint32_t share_deny = 0;
2238 		openflag4 openhow;
2239 		fattr4 inattrs;
2240 		open_claim4 claim;
2241 		sessionid4 sid;
2242 		/* SEQUENCE, PUTFH, OPEN, GETFH, GETATTR */
2243 		#define FSAL_OPEN_NB_OP 5
2244 		nfs_argop4 argoparray[FSAL_OPEN_NB_OP];
2245 		nfs_resop4 resoparray[FSAL_OPEN_NB_OP];
2246 		/* SEQUENCE, PUTFH, SETATTR, GETATTR */
2247 		#define FSAL_OPEN_SETATTR_NB_OP 4
2248 		nfs_argop4 setattr_argoparray[FSAL_OPEN_SETATTR_NB_OP];
2249 		nfs_resop4 setattr_resoparray[FSAL_OPEN_SETATTR_NB_OP];
2250 		OPEN4resok *opok;
2251 		GETFH4resok *fhok;
2252 		GETATTR4resok *atok;
2253 		char fattr_blob[FATTR_BLOB_SZ];
2254 		bool setattr_needed = false;
2255 	
2256 		/* we have not done yet any check */
2257 		*caller_perm_check = true;
2258 	
2259 		/* get back proxy handle */
2260 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
2261 	
2262 		/* include TRUNCATE case in attrs_in */
2263 		if (openflags & FSAL_O_TRUNC) {
2264 			attrs_in->valid_mask |= ATTR_SIZE;
2265 			attrs_in->filesize = 0;
2266 		}
2267 	
2268 		/* fill inattrs */
2269 		if (pxy_fsalattr_to_fattr4(attrs_in, &inattrs) == -1)
2270 			return fsalstat(ERR_FSAL_INVAL, -1);
2271 	
2272 		/* Three cases need to do an open :
2273 		 * -open by name to get an handle
2274 		 * -open by handle to get attrs_out
2275 		 * -open by handle to truncate
2276 		 */
2277 		if (name || attrs_out || openflags & FSAL_O_TRUNC) {
2278 			/*
2279 			* We do the open to get handle, check perm, check share, trunc,
2280 			* create if needed ...
2281 			*/
2282 			/* open call will do the perm check */
2283 			*caller_perm_check = false;
2284 	
2285 			/* prepare open call */
2286 			/* SEQUENCE */
2287 			pxy_get_client_sessionid(sid);
2288 			COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid,
2289 						       NB_RPC_SLOT);
2290 	
2291 			/* PUTFH */
2292 			COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
2293 	
2294 			/* OPEN */
2295 			/* prepare answer */
2296 			opok =
2297 			    &resoparray[opcnt].nfs_resop4_u.opopen.OPEN4res_u.resok4;
2298 			opok->rflags = 0; /* set to NULL for safety */
2299 			opok->attrset = empty_bitmap; /* set to empty for safety */
2300 			/* prepare open input args */
2301 			/* share_access and share_deny */
2302 			st = fill_share_OPEN4args(&share_access, &share_deny,
2303 						  openflags);
2304 			if (FSAL_IS_ERROR(st)) {
2305 				nfs4_Fattr_Free(&inattrs);
2306 				return st;
2307 			}
2308 	
2309 			/* owner */
2310 			snprintf(owner_val, sizeof(owner_val),
2311 				 "GANESHA/PROXY: pid=%u %" PRIu64, getpid(),
2312 				 atomic_inc_uint64_t(&fcnt));
2313 			owner_len = strnlen(owner_val, sizeof(owner_val));
2314 			/* inattrs and openhow */
2315 			st = fill_openhow_OPEN4args(&openhow, inattrs, createmode,
2316 						    verifier, &setattr_needed, name,
2317 						    openflags);
2318 			if (FSAL_IS_ERROR(st)) {
2319 				nfs4_Fattr_Free(&inattrs);
2320 				return st;
2321 			}
2322 	
2323 			/* claim : first support_ex version, no state -> no claim */
2324 			if (name) {
2325 			/* open by name */
2326 				claim.claim = CLAIM_NULL;
2327 				claim.open_claim4_u.file.utf8string_val = (char *)name;
2328 				claim.open_claim4_u.file.utf8string_len = strlen(name);
2329 			} else {
2330 			/* open by handle */
2331 				claim.claim = CLAIM_FH;
2332 			}
2333 			/* add open */
2334 			COMPOUNDV4_ARGS_ADD_OP_OPEN_4_1(opcnt, argoparray, share_access,
2335 							share_deny, owner_val,
2336 							owner_len, openhow, claim);
2337 	
2338 			/* GETFH */
2339 			/* prepare answer */
2340 			fhok =
2341 			    &resoparray[opcnt].nfs_resop4_u.opgetfh.GETFH4res_u.resok4;
2342 			fhok->object.nfs_fh4_val = padfilehandle;
2343 			fhok->object.nfs_fh4_len = sizeof(padfilehandle);
2344 			COMPOUNDV4_ARG_ADD_OP_GETFH(opcnt, argoparray);
2345 			if (!setattr_needed && (new_obj || attrs_out)) {
2346 				/* GETATTR */
2347 				atok = pxy_fill_getattr_reply(resoparray + opcnt,
2348 							      fattr_blob,
2349 							      sizeof(fattr_blob));
2350 				COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray,
2351 							      pxy_bitmap_getattr);
2352 			}
2353 			/* nfs call*/
2354 			rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray,
2355 					    resoparray);
2356 			if (rc != NFS4_OK) {
2357 				nfs4_Fattr_Free(&inattrs);
2358 				return nfsstat4_to_fsal(rc);
2359 			}
2360 	
2361 	
2362 			/* update stateid in current state */
2363 			if (state) {
2364 				struct pxy_state *pxy_state_id = container_of(state,
2365 							struct pxy_state, state);
2366 	
2367 				pxy_state_id->stateid = opok->stateid;
2368 			}
2369 		}
2370 	
2371 		if (setattr_needed) {
2372 			opcnt = 0;
2373 			/* SEQUENCE */
2374 			pxy_get_client_sessionid(sid);
2375 			COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, setattr_argoparray, sid,
2376 						       NB_RPC_SLOT);
2377 			/* PUTFH */
2378 			COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, setattr_argoparray,
2379 						    fhok->object);
2380 			/* SETATTR for truncate */
2381 			setattr_resoparray[opcnt].nfs_resop4_u.opsetattr.attrsset =
2382 									empty_bitmap;
2383 			/* We have a stateid */
2384 			/* cause we did an open when we set setattr_needed. */
2385 			COMPOUNDV4_ARG_ADD_OP_SETATTR(opcnt, setattr_argoparray,
2386 						      inattrs, opok->stateid.other);
2387 	
2388 			if (new_obj || attrs_out) {
2389 				/* GETATTR */
2390 				atok = pxy_fill_getattr_reply(
2391 							setattr_resoparray + opcnt,
2392 							fattr_blob,
2393 							sizeof(fattr_blob));
2394 				COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, setattr_argoparray,
2395 							      pxy_bitmap_getattr);
2396 			}
2397 	
2398 			/* nfs call*/
2399 			rc = pxy_nfsv4_call(op_ctx->creds, opcnt, setattr_argoparray,
2400 					    setattr_resoparray);
2401 			if (rc != NFS4_OK) {
2402 				nfs4_Fattr_Free(&inattrs);
2403 				return nfsstat4_to_fsal(rc);
2404 			}
2405 		}
2406 	
2407 		/* clean inattrs */
2408 		nfs4_Fattr_Free(&inattrs);
2409 	
2410 		/* create a new object if asked and attrs_out by the way */
2411 		if (new_obj) {
2412 			if (name) {
2413 				/* create new_obj and set attrs_out*/
2414 				st = pxy_make_object(op_ctx->fsal_export,
2415 						     &atok->obj_attributes,
2416 						     &fhok->object,
2417 						     new_obj, attrs_out);
2418 				if (FSAL_IS_ERROR(st))
2419 					return st;
2420 			} else {
2421 				*new_obj = obj_hdl;
2422 			}
2423 		}
2424 	
2425 		/* set attrs_out if needed and not yet done by creating new object */
2426 		if (attrs_out && (!new_obj || (new_obj && !name))) {
2427 			rc = nfs4_Fattr_To_FSAL_attr(attrs_out, &atok->obj_attributes,
2428 						     NULL);
2429 			if (rc != NFS4_OK)
2430 				return nfsstat4_to_fsal(rc);
2431 		}
2432 	
2433 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2434 	}
2435 	
2436 	/* XXX Note that this only currently supports a vector size of 1 */
2437 	static void pxy_read2(struct fsal_obj_handle *obj_hdl,
2438 			      bool bypass,
2439 			      fsal_async_cb done_cb,
2440 			      struct fsal_io_arg *read_arg,
2441 			      void *caller_arg)
2442 	{
2443 		int maxReadSize;
2444 		int rc;
2445 		int opcnt = 0;
2446 		struct pxy_obj_handle *ph;
2447 		sessionid4 sid;
2448 	#define FSAL_READ2_NB_OP_ALLOC 3 /* SEQUENCE + PUTFH + READ */
2449 		nfs_argop4 argoparray[FSAL_READ2_NB_OP_ALLOC];
2450 		nfs_resop4 resoparray[FSAL_READ2_NB_OP_ALLOC];
2451 		READ4resok *rok;
2452 	
2453 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
2454 	
2455 		maxReadSize = op_ctx->fsal_export->exp_ops.fs_maxread(
2456 								op_ctx->fsal_export);
2457 		if (read_arg->iov[0].iov_len > maxReadSize)
2458 			read_arg->iov[0].iov_len = maxReadSize;
2459 	
2460 		/* SEQUENCE */
2461 		pxy_get_client_sessionid(sid);
2462 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
2463 		/* prepare PUTFH */
2464 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
2465 		/* prepare READ */
2466 		rok = &resoparray[opcnt].nfs_resop4_u.opread.READ4res_u.resok4;
2467 		rok->data.data_val = read_arg->iov[0].iov_base;
2468 		rok->data.data_len = read_arg->iov[0].iov_len;
2469 		if (bypass)
2470 			COMPOUNDV4_ARG_ADD_OP_READ_BYPASS(opcnt, argoparray,
2471 							  read_arg->offset,
2472 							  read_arg->iov[0].iov_len);
2473 		else {
2474 			if (read_arg->state) {
2475 				struct pxy_state *pxy_state_id = container_of(
2476 							read_arg->state,
2477 							struct pxy_state, state);
2478 	
2479 				COMPOUNDV4_ARG_ADD_OP_READ(opcnt, argoparray,
2480 							   read_arg->offset,
2481 							   read_arg->iov[0].iov_len,
2482 							   pxy_state_id->stateid.other);
2483 	
2484 			} else {
2485 				COMPOUNDV4_ARG_ADD_OP_READ_STATELESS(opcnt, argoparray,
2486 						     read_arg->offset,
2487 						     read_arg->iov[0].iov_len);
2488 			}
2489 		}
2490 	
2491 		/* nfs call */
2492 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
2493 		if (rc != NFS4_OK) {
2494 			done_cb(obj_hdl, nfsstat4_to_fsal(rc), read_arg, caller_arg);
2495 			return;
2496 		}
2497 	
2498 		read_arg->end_of_file = rok->eof;
2499 		read_arg->io_amount = rok->data.data_len;
2500 		if (read_arg->info) {
2501 			read_arg->info->io_content.what = NFS4_CONTENT_DATA;
2502 			read_arg->info->io_content.data.d_offset = read_arg->offset +
2503 				read_arg->io_amount;
2504 			read_arg->info->io_content.data.d_data.data_len =
2505 				read_arg->io_amount;
2506 			read_arg->info->io_content.data.d_data.data_val =
2507 				read_arg->iov[0].iov_base;
2508 		}
2509 		done_cb(obj_hdl, fsalstat(0, 0), read_arg, caller_arg);
2510 	}
2511 	
2512 	static void pxy_write2(struct fsal_obj_handle *obj_hdl,
2513 			       bool bypass,
2514 			       fsal_async_cb done_cb,
2515 			       struct fsal_io_arg *write_arg,
2516 			       void *caller_arg)
2517 	{
2518 		int maxWriteSize;
2519 		int rc;
2520 		int opcnt = 0;
2521 		sessionid4 sid;
2522 	#define FSAL_WRITE_NB_OP_ALLOC 3 /* SEQUENCE + PUTFH + WRITE */
2523 		nfs_argop4 argoparray[FSAL_WRITE_NB_OP_ALLOC];
2524 		nfs_resop4 resoparray[FSAL_WRITE_NB_OP_ALLOC];
2525 		WRITE4resok *wok;
2526 		struct pxy_obj_handle *ph;
2527 		stable_how4 stable_how;
2528 		size_t buffer_size = write_arg->iov[0].iov_len;
2529 	
2530 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
2531 	
2532 		/* check max write size */
2533 		maxWriteSize = op_ctx->fsal_export->exp_ops.fs_maxwrite(
2534 								op_ctx->fsal_export);
2535 		if (buffer_size > maxWriteSize)
2536 			buffer_size = maxWriteSize;
2537 	
2538 		/* SEQUENCE */
2539 		pxy_get_client_sessionid(sid);
2540 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
2541 		/* prepare PUTFH */
2542 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
2543 		/* prepare write */
2544 		wok = &resoparray[opcnt].nfs_resop4_u.opwrite.WRITE4res_u.resok4;
2545 	
2546 		if (write_arg->fsal_stable)
2547 			stable_how = DATA_SYNC4;
2548 		else
2549 			stable_how = UNSTABLE4;
2550 		if (write_arg->state) {
2551 			struct pxy_state *pxy_state_id = container_of(write_arg->state,
2552 								      struct pxy_state,
2553 								      state);
2554 	
2555 			COMPOUNDV4_ARG_ADD_OP_WRITE(opcnt, argoparray,
2556 						    write_arg->offset,
2557 						    write_arg->iov[0].iov_base,
2558 						    buffer_size, stable_how,
2559 						    pxy_state_id->stateid.other);
2560 		} else {
2561 			COMPOUNDV4_ARG_ADD_OP_WRITE_STATELESS(opcnt, argoparray,
2562 							  write_arg->offset,
2563 							  write_arg->iov[0].iov_base,
2564 							  buffer_size, stable_how);
2565 		}
2566 	
2567 		/* nfs call */
2568 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
2569 		if (rc != NFS4_OK) {
2570 			done_cb(obj_hdl, nfsstat4_to_fsal(rc), write_arg, caller_arg);
2571 			return;
2572 		}
2573 	
2574 		/* get res */
2575 		write_arg->io_amount = wok->count;
2576 		if (wok->committed == UNSTABLE4)
2577 			write_arg->fsal_stable = false;
2578 		else
2579 			write_arg->fsal_stable = true;
2580 	
2581 		done_cb(obj_hdl, fsalstat(ERR_FSAL_NO_ERROR, 0), write_arg, caller_arg);
2582 	}
2583 	
2584 	static fsal_status_t pxy_close2(struct fsal_obj_handle *obj_hdl,
2585 					struct state_t *state)
2586 	{
2587 		struct pxy_obj_handle *ph;
2588 		int rc;
2589 		int opcnt = 0;
2590 		sessionid4 sessionid;
2591 		/* SEQUENCE, PUTFH, CLOSE */
2592 	#define FSAL_CLOSE_NB_OP_ALLOC 3
2593 		nfs_argop4 argoparray[FSAL_CLOSE_NB_OP_ALLOC];
2594 		nfs_resop4 resoparray[FSAL_CLOSE_NB_OP_ALLOC];
2595 		char All_Zero[] = "\0\0\0\0\0\0\0\0\0\0\0\0";	/* 12 times \0 */
2596 		struct pxy_state *pxy_state_id = NULL;
2597 	
2598 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
2599 	
2600 		/* Check if this was a "stateless" open,
2601 		 * then nothing is to be done at close */
2602 		if (!state) {
2603 			return fsalstat(ERR_FSAL_NO_ERROR, 0);
2604 		} else {
2605 			pxy_state_id = container_of(state, struct pxy_state, state);
2606 			if (!memcmp(pxy_state_id->stateid.other, All_Zero, 12))
2607 				return fsalstat(ERR_FSAL_NO_ERROR, 0);
2608 		}
2609 	
2610 		/* SEQUENCE */
2611 		pxy_get_client_sessionid(sessionid);
2612 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sessionid,
2613 					       NB_RPC_SLOT);
2614 		/* PUTFH */
2615 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
2616 		/* CLOSE */
2617 		if (state)
2618 			COMPOUNDV4_ARG_ADD_OP_CLOSE_4_1(opcnt, argoparray,
2619 							pxy_state_id->stateid);
2620 		else
2621 			COMPOUNDV4_ARG_ADD_OP_CLOSE_4_1_STATELESS(opcnt, argoparray);
2622 	
2623 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
2624 		if (rc != NFS4_OK) {
2625 			return nfsstat4_to_fsal(rc);
2626 		}
2627 	
2628 		/* We clean local saved stateid. */
2629 		if (state)
2630 			memset(&pxy_state_id->stateid, 0, sizeof(stateid4));
2631 	
2632 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2633 	}
2634 	
2635 	static fsal_status_t pxy_setattr2(struct fsal_obj_handle *obj_hdl,
2636 					  bool bypass,
2637 					  struct state_t *state,
2638 					  struct attrlist *attrib_set)
2639 	{
2640 		int rc;
2641 		fattr4 input_attr;
2642 		uint32_t opcnt = 0;
2643 		struct pxy_obj_handle *ph;
2644 		sessionid4 sid;
2645 	#define FSAL_SETATTR2_NB_OP_ALLOC 3 /* SEQUENCE PUTFH SETATTR */
2646 		nfs_argop4 argoparray[FSAL_SETATTR2_NB_OP_ALLOC];
2647 		nfs_resop4 resoparray[FSAL_SETATTR2_NB_OP_ALLOC];
2648 	
2649 		/*prepare attributes */
2650 		/*
2651 		* No way to update CTIME using a NFSv4 SETATTR.
2652 		* Server will return NFS4ERR_INVAL (22).
2653 		* time_metadata is a readonly attribute in NFSv4 and NFSv4.1.
2654 		* (section 5.7 in RFC7530 or RFC5651)
2655 		* Nevermind : this update is useless, we prevent it.
2656 		*/
2657 		FSAL_UNSET_MASK(attrib_set->valid_mask, ATTR_CTIME);
2658 	
2659 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_MODE))
2660 			attrib_set->mode &=
2661 				~op_ctx->fsal_export->exp_ops.fs_umask(
2662 								op_ctx->fsal_export);
2663 	
2664 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
2665 	
2666 		if (pxy_fsalattr_to_fattr4(attrib_set, &input_attr) == -1)
2667 			return fsalstat(ERR_FSAL_INVAL, EINVAL);
2668 	
2669 		/* SEQUENCE */
2670 		pxy_get_client_sessionid(sid);
2671 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
2672 		/* prepare PUTFH */
2673 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
2674 	
2675 		/* prepare SETATTR */
2676 		resoparray[opcnt].nfs_resop4_u.opsetattr.attrsset = empty_bitmap;
2677 	
2678 		/* We don't use the special "bypass" stateid. */
2679 		/* Indeed, bypass state will be treated like anonymous value. */
2680 		/* RFC 5661, section 8.2.3 */
2681 		if (state) {
2682 			struct pxy_state *pxy_state_id = container_of(state,
2683 								      struct pxy_state,
2684 								      state);
2685 			COMPOUNDV4_ARG_ADD_OP_SETATTR(opcnt, argoparray, input_attr,
2686 						      pxy_state_id->stateid.other);
2687 		} else {
2688 			COMPOUNDV4_ARG_ADD_OP_SETATTR_STATELESS(opcnt, argoparray,
2689 								input_attr);
2690 		}
2691 	
2692 		/* nfs call */
2693 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
2694 		nfs4_Fattr_Free(&input_attr);
2695 		if (rc != NFS4_OK)
2696 			return nfsstat4_to_fsal(rc);
2697 	
2698 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2699 	}
2700 	
2701 	static fsal_openflags_t pxy_status2(struct fsal_obj_handle *obj_hdl,
2702 				     struct state_t *state)
2703 	{
2704 		/* first version of support_ex, no state, no saved openflags */
2705 		fsal_openflags_t null_flags = 0; /* closed and deny_none*/
2706 	
2707 		return null_flags;
2708 	}
2709 	
2710 	static fsal_status_t pxy_reopen2(struct fsal_obj_handle *obj_hdl,
2711 				  struct state_t *state,
2712 				  fsal_openflags_t openflags)
2713 	{
2714 		/* no way to open by handle in v4 */
2715 		/* waiting for v4.1 or solid state to really do the job */
2716 	
2717 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2718 	}
2719 	
2720 	static fsal_status_t pxy_commit2(struct fsal_obj_handle *obj_hdl,
2721 			      off_t offset,
2722 			      size_t len)
2723 	{
2724 		struct pxy_obj_handle *ph;
2725 		int rc; /* return code of nfs call */
2726 		int opcnt = 0; /* nfs arg counter */
2727 		sessionid4 sid;
2728 	#define FSAL_COMMIT2_NB_OP 3 /* SEQUENCE, PUTFH, COMMIT */
2729 		nfs_argop4 argoparray[FSAL_COMMIT2_NB_OP];
2730 		nfs_resop4 resoparray[FSAL_COMMIT2_NB_OP];
2731 	
2732 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
2733 	
2734 		/* SEQUENCE */
2735 		pxy_get_client_sessionid(sid);
2736 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
2737 		/* prepare PUTFH */
2738 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
2739 	
2740 		/* prepare COMMIT */
2741 		COMPOUNDV4_ARG_ADD_OP_COMMIT(opcnt, argoparray, offset, len);
2742 	
2743 		/* nfs call */
2744 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
2745 		if (rc != NFS4_OK)
2746 			return nfsstat4_to_fsal(rc);
2747 	
2748 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2749 	}
2750 	
2751 	void pxy_handle_ops_init(struct fsal_obj_ops *ops)
2752 	{
2753 		fsal_default_obj_ops_init(ops);
2754 	
2755 		ops->release = pxy_hdl_release;
2756 		ops->lookup = pxy_lookup;
2757 		ops->readdir = pxy_readdir;
2758 		ops->mkdir = pxy_mkdir;
2759 		ops->mknode = pxy_mknod;
2760 		ops->symlink = pxy_symlink;
2761 		ops->readlink = pxy_readlink;
2762 		ops->getattrs = pxy_getattrs;
2763 		ops->link = pxy_link;
2764 		ops->rename = pxy_rename;
2765 		ops->unlink = pxy_unlink;
2766 		ops->close = pxy_close;
2767 		ops->handle_to_wire = pxy_handle_to_wire;
2768 		ops->handle_to_key = pxy_handle_to_key;
2769 		ops->open2 = pxy_open2;
2770 		ops->read2 = pxy_read2;
2771 		ops->write2 = pxy_write2;
2772 		ops->close2 = pxy_close2;
2773 		ops->setattr2 = pxy_setattr2;
2774 		ops->status2 = pxy_status2;
2775 		ops->reopen2 = pxy_reopen2;
2776 		ops->commit2 = pxy_commit2;
2777 	}
2778 	
2779 	#ifdef PROXY_HANDLE_MAPPING
2780 	static unsigned int hash_nfs_fh4(const nfs_fh4 *fh, unsigned int cookie)
2781 	{
2782 		const char *cpt;
2783 		unsigned int sum = cookie;
2784 		unsigned int extract;
2785 		unsigned int mod = fh->nfs_fh4_len % sizeof(unsigned int);
2786 	
2787 		for (cpt = fh->nfs_fh4_val;
2788 		     cpt - fh->nfs_fh4_val < fh->nfs_fh4_len - mod;
2789 		     cpt += sizeof(unsigned int)) {
2790 			memcpy(&extract, cpt, sizeof(unsigned int));
2791 			sum = (3 * sum + 5 * extract + 1999);
2792 		}
2793 	
2794 		/*
2795 		 * If the handle is not 32 bits-aligned, the last loop will
2796 		 * get uninitialized chars after the end of the handle. We
2797 		 * must avoid this by skipping the last loop and doing a
2798 		 * special processing for the last bytes
2799 		 */
2800 		if (mod) {
2801 			extract = 0;
2802 			while (cpt - fh->nfs_fh4_val < fh->nfs_fh4_len) {
2803 				extract <<= 8;
2804 				extract |= (uint8_t) (*cpt++);
2805 			}
2806 			sum = (3 * sum + 5 * extract + 1999);
2807 		}
2808 	
2809 		return sum;
2810 	}
2811 	#endif
2812 	
2813 	static struct pxy_obj_handle *pxy_alloc_handle(struct fsal_export *exp,
2814 						       const nfs_fh4 *fh,
2815 						       fattr4 *obj_attributes,
2816 						       struct attrlist *attrs_out)
2817 	{
2818 		struct pxy_obj_handle *n = gsh_calloc(1, sizeof(*n) + fh->nfs_fh4_len);
2819 		compound_data_t data;
2820 		struct attrlist attributes;
2821 	
2822 		memset(&attributes, 0, sizeof(attributes));
2823 		memset(&data, 0, sizeof(data));
2824 	
2825 		data.current_obj = &n->obj;
2826 	
2827 		if (nfs4_Fattr_To_FSAL_attr(&attributes,
2828 					    obj_attributes,
2829 					    &data) != NFS4_OK) {
2830 			gsh_free(n);
2831 			return NULL;
2832 		}
2833 	
2834 		n->fh4 = *fh;
2835 		n->fh4.nfs_fh4_val = n->blob.bytes;
2836 		memcpy(n->blob.bytes, fh->nfs_fh4_val, fh->nfs_fh4_len);
2837 		n->blob.len = fh->nfs_fh4_len + sizeof(n->blob);
2838 		n->blob.type = attributes.type;
2839 	#ifdef PROXY_HANDLE_MAPPING
2840 		int rc;
2841 	
2842 		memset(&n->h23, 0, sizeof(n->h23));
2843 		n->h23.len = sizeof(n->h23);
2844 		n->h23.type = PXY_HANDLE_MAPPED;
2845 		n->h23.object_id = n->obj.fileid;
2846 		n->h23.handle_hash = hash_nfs_fh4(fh, n->obj.fileid);
2847 	
2848 		rc = HandleMap_SetFH(&n->h23, &n->blob, n->blob.len);
2849 		if ((rc != HANDLEMAP_SUCCESS) && (rc != HANDLEMAP_EXISTS)) {
2850 			gsh_free(n);
2851 			return NULL;
2852 		}
2853 	#endif
2854 	
2855 		fsal_obj_handle_init(&n->obj, exp, attributes.type);
2856 		n->obj.fs = NULL;
2857 		n->obj.state_hdl = NULL;
2858 		n->obj.fsid = attributes.fsid;
2859 		n->obj.fileid = attributes.fileid;
2860 		n->obj.obj_ops = &PROXY.handle_ops;
2861 		if (attrs_out != NULL) {
2862 			/* We aren't keeping ACL ref ourself, so pass it
2863 			 * to the caller.
2864 			 */
2865 			fsal_copy_attrs(attrs_out, &attributes, true);
2866 		} else {
2867 			/* Make sure we release the attributes. */
2868 			fsal_release_attrs(&attributes);
2869 		}
2870 	
2871 		return n;
2872 	}
2873 	
2874 	/* export methods that create object handles
2875 	 */
2876 	
2877 	fsal_status_t pxy_lookup_path(struct fsal_export *exp_hdl,
2878 				      const char *path,
2879 				      struct fsal_obj_handle **handle,
2880 				      struct attrlist *attrs_out)
2881 	{
2882 		struct fsal_obj_handle *next;
2883 		struct fsal_obj_handle *parent = NULL;
2884 		char *saved;
2885 		char *pcopy;
2886 		char *p, *pnext;
2887 		struct user_cred *creds = op_ctx->creds;
2888 	
2889 		pcopy = gsh_strdup(path);
2890 	
2891 		p = strtok_r(pcopy, "/", &saved);
2892 		if (!p) {
2893 			fsal_status_t st = pxy_lookup_impl(parent, exp_hdl, creds,
2894 							   NULL, &next, attrs_out);
2895 			if (FSAL_IS_ERROR(st)) {
2896 				gsh_free(pcopy);
2897 				return st;
2898 			}
2899 		}
2900 		while (p) {
2901 			if (strcmp(p, "..") == 0) {
2902 				/* Don't allow lookup of ".." */
2903 				LogInfo(COMPONENT_FSAL,
2904 					"Attempt to use \"..\" element in path %s",
2905 					path);
2906 				gsh_free(pcopy);
2907 				return fsalstat(ERR_FSAL_ACCESS, EACCES);
2908 			}
2909 			/* Get the next token now, so we know if we are at the
2910 			 * terminal token or not.
2911 			 */
2912 			pnext = strtok_r(NULL, "/", &saved);
2913 	
2914 			/* Note that if any element is a symlink, the following will
2915 			 * fail, thus no security exposure. Only pass back the
2916 			 * attributes of the terminal lookup.
2917 			 */
2918 			fsal_status_t st = pxy_lookup_impl(parent, exp_hdl, creds, p,
2919 							   &next, pnext == NULL ?
2920 							   attrs_out : NULL);
2921 			if (FSAL_IS_ERROR(st)) {
2922 				gsh_free(pcopy);
2923 				return st;
2924 			}
2925 	
2926 			p = pnext;
2927 			parent = next;
2928 		}
2929 		/* The final element could be a symlink, but either way we are called
2930 		 * will not work with a symlink, so no security exposure there.
2931 		 */
2932 	
2933 		gsh_free(pcopy);
2934 		*handle = next;
2935 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2936 	}
2937 	
2938 	/*
2939 	 * Create an FSAL 'object' from the handle - used
2940 	 * to construct objects from a handle which has been
2941 	 * 'extracted' by .wire_to_host.
2942 	 */
2943 	fsal_status_t pxy_create_handle(struct fsal_export *exp_hdl,
2944 					struct gsh_buffdesc *hdl_desc,
2945 					struct fsal_obj_handle **handle,
2946 					struct attrlist *attrs_out)
2947 	{
2948 		nfs_fh4 fh4;
2949 		struct pxy_obj_handle *ph;
2950 		struct pxy_handle_blob *blob;
2951 		int rc;
2952 		uint32_t opcnt = 0;
2953 		sessionid4 sid;
2954 	#define FSAL_CREATE_HANDLE_NB_OP_ALLOC 3 /* SEQUENCE PUTFH GETATTR */
2955 		nfs_argop4 argoparray[FSAL_CREATE_HANDLE_NB_OP_ALLOC];
2956 		nfs_resop4 resoparray[FSAL_CREATE_HANDLE_NB_OP_ALLOC];
2957 		GETATTR4resok *atok;
2958 		char fattr_blob[FATTR_BLOB_SZ];
2959 	
2960 		blob = (struct pxy_handle_blob *)hdl_desc->addr;
2961 		if (blob->len != hdl_desc->len)
2962 			return fsalstat(ERR_FSAL_INVAL, 0);
2963 	
2964 		fh4.nfs_fh4_val = blob->bytes;
2965 		fh4.nfs_fh4_len = blob->len - sizeof(*blob);
2966 	
2967 		/* SEQUENCE */
2968 		pxy_get_client_sessionid(sid);
2969 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
2970 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, fh4);
2971 	
2972 		atok = pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
2973 					      sizeof(fattr_blob));
2974 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_getattr);
2975 	
2976 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
2977 	
2978 		if (rc != NFS4_OK)
2979 			return nfsstat4_to_fsal(rc);
2980 	
2981 		ph = pxy_alloc_handle(exp_hdl, &fh4, &atok->obj_attributes,
2982 				      attrs_out);
2983 		if (!ph)
2984 			return fsalstat(ERR_FSAL_FAULT, 0);
2985 	
2986 		*handle = &ph->obj;
2987 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2988 	}
2989 	
2990 	fsal_status_t pxy_get_dynamic_info(struct fsal_export *exp_hdl,
2991 					   struct fsal_obj_handle *obj_hdl,
2992 					   fsal_dynamicfsinfo_t *infop)
2993 	{
2994 		int rc;
2995 		int opcnt = 0;
2996 		sessionid4 sid;
2997 	#define FSAL_FSINFO_NB_OP_ALLOC 3 /* SEQUENCE PUTFH GETATTR */
2998 		nfs_argop4 argoparray[FSAL_FSINFO_NB_OP_ALLOC];
2999 		nfs_resop4 resoparray[FSAL_FSINFO_NB_OP_ALLOC];
3000 		GETATTR4resok *atok;
3001 		char fattr_blob[48];	/* 6 values, 8 bytes each */
3002 		struct pxy_obj_handle *ph;
3003 	
3004 		ph = container_of(obj_hdl, struct pxy_obj_handle, obj);
3005 	
3006 		/* SEQUENCE */
3007 		pxy_get_client_sessionid(sid);
3008 		COMPOUNDV4_ARG_ADD_OP_SEQUENCE(opcnt, argoparray, sid, NB_RPC_SLOT);
3009 		COMPOUNDV4_ARG_ADD_OP_PUTFH(opcnt, argoparray, ph->fh4);
3010 		atok =
3011 		    pxy_fill_getattr_reply(resoparray + opcnt, fattr_blob,
3012 					   sizeof(fattr_blob));
3013 		COMPOUNDV4_ARG_ADD_OP_GETATTR(opcnt, argoparray, pxy_bitmap_fsinfo);
3014 	
3015 		rc = pxy_nfsv4_call(op_ctx->creds, opcnt, argoparray, resoparray);
3016 		if (rc != NFS4_OK)
3017 			return nfsstat4_to_fsal(rc);
3018 	
3019 		if (nfs4_Fattr_To_fsinfo(infop, &atok->obj_attributes) != NFS4_OK)
3020 			return fsalstat(ERR_FSAL_INVAL, 0);
3021 	
3022 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
3023 	}
3024 	
3025 	/* Convert of-the-wire digest into unique 'handle' which
3026 	 * can be used to identify the object */
3027 	fsal_status_t pxy_wire_to_host(struct fsal_export *exp_hdl,
3028 				       fsal_digesttype_t in_type,
3029 				       struct gsh_buffdesc *fh_desc,
3030 				       int flags)
3031 	{
3032 		struct pxy_handle_blob *pxyblob;
3033 		size_t fh_size;
3034 	
3035 		if (!fh_desc || !fh_desc->addr)
3036 			return fsalstat(ERR_FSAL_FAULT, EINVAL);
3037 	
3038 		pxyblob = (struct pxy_handle_blob *)fh_desc->addr;
3039 		fh_size = pxyblob->len;
3040 	#ifdef PROXY_HANDLE_MAPPING
3041 		if (in_type == FSAL_DIGEST_NFSV3)
3042 			fh_size = sizeof(nfs23_map_handle_t);
3043 	#endif
3044 		if (fh_desc->len != fh_size) {
3045 			LogMajor(COMPONENT_FSAL,
3046 				 "Size mismatch for handle.  should be %zu, got %zu",
3047 				 fh_size, fh_desc->len);
3048 			return fsalstat(ERR_FSAL_SERVERFAULT, 0);
3049 		}
3050 	#ifdef PROXY_HANDLE_MAPPING
3051 		if (in_type == FSAL_DIGEST_NFSV3) {
3052 			nfs23_map_handle_t *h23 = (nfs23_map_handle_t *) fh_desc->addr;
3053 	
3054 			if (h23->type != PXY_HANDLE_MAPPED)
3055 				return fsalstat(ERR_FSAL_STALE, ESTALE);
3056 	
3057 			/* As long as HandleMap_GetFH copies nfs23 handle into
3058 			 * the key before lookup I can get away with using
3059 			 * the same buffer for input and output */
3060 			if (HandleMap_GetFH(h23, fh_desc) != HANDLEMAP_SUCCESS)
3061 				return fsalstat(ERR_FSAL_STALE, 0);
3062 			fh_size = fh_desc->len;
3063 		}
3064 	#endif
3065 	
3066 		fh_desc->len = fh_size;
3067 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
3068 	}
3069