1    	/*
2    	 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Copyright (C) Red Hat  Inc., 2013
5    	 * Author: Anand Subramanian anands@redhat.com
6    	 *
7    	 * This program is free software; you can redistribute it and/or
8    	 * modify it under the terms of the GNU Lesser General Public
9    	 * License as published by the Free Software Foundation; either
10   	 * version 3 of the License, or (at your option) any later version.
11   	 *
12   	 * This program is distributed in the hope that it will be useful,
13   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   	 * Lesser General Public License for more details.
16   	 *
17   	 * You should have received a copy of the GNU Lesser General Public
18   	 * License along with this library; if not, write to the Free Software
19   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20   	 *
21   	 * -------------
22   	 */
23   	
24   	#include "config.h"
25   	#ifdef LINUX
26   	#include <sys/sysmacros.h> /* for makedev(3) */
27   	#endif
28   	#include <fcntl.h>
29   	#include "fsal.h"
30   	#include "gluster_internal.h"
31   	#include "FSAL/fsal_commonlib.h"
32   	#include "fsal_convert.h"
33   	#include "pnfs_utils.h"
34   	#include "nfs_exports.h"
35   	#include "sal_data.h"
36   	#include "sal_functions.h"
37   	#include "fsal_types.h"
38   	
39   	/* fsal_obj_handle common methods
40   	 */
41   	
42   	/**
43   	 * @brief Implements GLUSTER FSAL objectoperation handle_release
44   	 *
45   	 * Free up the GLUSTER handle and associated data if any
46   	 * Typically free up any members of the struct glusterfs_handle
47   	 */
48   	
49   	static void handle_release(struct fsal_obj_handle *obj_hdl)
50   	{
51   		int rc = 0;
52   		struct glusterfs_handle *objhandle =
53   		    container_of(obj_hdl, struct glusterfs_handle, handle);
54   		struct glusterfs_fd *my_fd = &objhandle->globalfd;
55   		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
56   	#ifdef GLTIMING
57   		struct timespec s_time, e_time;
58   	
59   		now(&s_time);
60   	#endif
61   	
62   		fsal_obj_handle_fini(&objhandle->handle);
63   	
64   		if (my_fd->glfd) {
65   	
66   			/* During shutdown, the op_ctx is NULL */
67   			if (op_ctx && op_ctx->fsal_export) {
68   				/* Since handle gets released as part of internal
69   				 * operation, we may not need to set credentials */
70   				status = glusterfs_close_my_fd(my_fd);
71   				if (FSAL_IS_ERROR(status)) {
72   					LogCrit(COMPONENT_FSAL,
73   						"glusterfs_close_my_fd returned %s",
74   						fsal_err_txt(status));
75   					/* cleanup as much as possible */
76   				}
77   			} else if (my_fd->openflags != FSAL_O_CLOSED) {
78   				rc = glfs_close(my_fd->glfd);
79   				if (rc) {
80   					LogCrit(COMPONENT_FSAL,
81   						"glfs_close returned %s(%d)",
82   						strerror(errno), errno);
83   				}
84   			}
85   			my_fd->glfd = NULL;
86   		}
87   	
88   		if (my_fd->creds.caller_garray) {
89   			gsh_free(my_fd->creds.caller_garray);
90   			my_fd->creds.caller_garray = NULL;
91   		}
92   	
93   		if (objhandle->glhandle) {
94   			rc = glfs_h_close(objhandle->glhandle);
95   			if (rc) {
96   				LogCrit(COMPONENT_FSAL,
97   					"glfs_h_close returned error %s(%d)",
98   					strerror(errno), errno);
99   			}
100  			objhandle->glhandle = NULL;
101  		}
102  	
103  		gsh_free(objhandle);
104  	
105  	#ifdef GLTIMING
106  		now(&e_time);
107  		latency_update(&s_time, &e_time, lat_handle_release);
108  	#endif
109  	}
110  	
111  	/**
112  	 * @brief Implements GLUSTER FSAL objectoperation lookup
113  	 */
114  	
115  	static fsal_status_t lookup(struct fsal_obj_handle *parent,
116  				    const char *path, struct fsal_obj_handle **handle,
117  				    struct attrlist *attrs_out)
118  	{
119  		int rc = 0;
120  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
121  		struct stat sb;
122  		struct glfs_object *glhandle = NULL;
123  		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
124  		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
125  		struct glusterfs_handle *objhandle = NULL;
126  		struct glusterfs_export *glfs_export =
127  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
128  		struct glusterfs_handle *parenthandle =
129  		    container_of(parent, struct glusterfs_handle, handle);
130  		glusterfs_fsal_xstat_t buffxstat = {.e_acl = NULL, .i_acl = NULL};
131  	
132  	#ifdef GLTIMING
133  		struct timespec s_time, e_time;
134  	
135  		now(&s_time);
136  	#endif
137  	
138  		glhandle = glfs_h_lookupat(glfs_export->gl_fs->fs,
139  					parenthandle->glhandle, path, &sb, 0);
140  		if (glhandle == NULL) {
141  			status = gluster2fsal_error(errno);
142  			goto out;
143  		}
144  	
145  		rc = glfs_h_extract_handle(glhandle, globjhdl, GFAPI_HANDLE_LENGTH);
146  		if (rc < 0) {
147  			status = gluster2fsal_error(errno);
148  			goto out;
149  		}
150  	
151  		rc = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
152  				       GLAPI_UUID_LENGTH);
153  		if (rc < 0) {
154  			status = gluster2fsal_error(errno);
155  			goto out;
156  		}
157  	
158  		construct_handle(glfs_export, &sb, glhandle, globjhdl,
159  				 &objhandle, vol_uuid);
160  	
161  		if (attrs_out != NULL) {
162  			posix2fsal_attributes_all(&sb, attrs_out);
163  	
164  			if (attrs_out->request_mask & ATTR_ACL) {
165  				/* Fetch the ACL */
166  				status = glusterfs_get_acl(glfs_export,
167  							   glhandle,
168  							   &buffxstat, attrs_out);
169  				if (status.major == ERR_FSAL_NOENT) {
170  					if (attrs_out->type == SYMBOLIC_LINK)
171  						status = fsalstat(ERR_FSAL_NO_ERROR, 0);
172  					else
173  						status = gluster2fsal_error(ESTALE);
174  				}
175  	
176  				if (!FSAL_IS_ERROR(status)) {
177  					/* Success, so mark ACL as valid. */
178  					attrs_out->valid_mask |= ATTR_ACL;
179  				} else {
180  					if (attrs_out->request_mask & ATTR_RDATTR_ERR)
181  						attrs_out->valid_mask = ATTR_RDATTR_ERR;
182  	
183  					fsal_release_attrs(attrs_out);
184  					goto out;
185  				}
186  			}
187  		}
188  	
189  		*handle = &objhandle->handle;
190  	
191  	 out:
192  		if (status.major != ERR_FSAL_NO_ERROR)
193  			gluster_cleanup_vars(glhandle);
194  	#ifdef GLTIMING
195  		now(&e_time);
196  		latency_update(&s_time, &e_time, lat_lookup);
197  	#endif
198  	
199  		glusterfs_fsal_clean_xstat(&buffxstat);
200  		return status;
201  	}
202  	
203  	static int
204  	glusterfs_fsal_get_sec_label(struct glusterfs_handle *glhandle,
205  				     struct attrlist *attrs)
206  	{
207  		int rc = 0;
208  		struct glusterfs_export *export =
209  			container_of(op_ctx->fsal_export,
210  				     struct glusterfs_export, export);
211  	
212  		if (FSAL_TEST_MASK(attrs->request_mask, ATTR4_SEC_LABEL) &&
213  		    op_ctx_export_has_option(EXPORT_OPTION_SECLABEL_SET)) {
214  	
215  			char label[NFS4_OPAQUE_LIMIT];
216  	
217  			rc = glfs_h_getxattrs(export->gl_fs->fs, glhandle->glhandle,
218  					export->sec_label_xattr, label,
219  					NFS4_OPAQUE_LIMIT);
220  	
221  			if (rc < 0) {
222  				/* If there's no label then just do zero-length one */
223  				if (errno != ENODATA) {
224  					rc = -errno;
225  					goto out_err;
226  				}
227  				rc = 0;
228  			}
229  	
230  			attrs->sec_label.slai_data.slai_data_len = rc;
231  			gsh_free(attrs->sec_label.slai_data.slai_data_val);
232  			if (rc > 0) {
233  				attrs->sec_label.slai_data.slai_data_val =
234  					gsh_memdup(label, rc);
235  				FSAL_SET_MASK(attrs->valid_mask, ATTR4_SEC_LABEL);
236  			} else {
237  				attrs->sec_label.slai_data.slai_data_val = NULL;
238  				FSAL_UNSET_MASK(attrs->valid_mask, ATTR4_SEC_LABEL);
239  			}
240  		}
241  	out_err:
242  		return rc;
243  	}
244  	
245  	/**
246  	 * @brief Implements GLUSTER FSAL objectoperation readdir
247  	 */
248  	
249  	static fsal_status_t read_dirents(struct fsal_obj_handle *dir_hdl,
250  					  fsal_cookie_t *whence, void *dir_state,
251  					  fsal_readdir_cb cb, attrmask_t attrmask,
252  					  bool *eof)
253  	{
254  		int rc = 0;
255  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
256  		struct glfs_fd *glfd = NULL;
257  		long offset = 0;
258  		struct dirent *pde = NULL;
259  		struct glusterfs_export *glfs_export =
260  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
261  		struct glusterfs_handle *objhandle =
262  		    container_of(dir_hdl, struct glusterfs_handle, handle);
263  	
264  	#ifdef USE_GLUSTER_XREADDIRPLUS
265  		struct glfs_object *glhandle = NULL;
266  		struct glfs_xreaddirp_stat *xstat = NULL;
267  		uint32_t flags =
268  			 (GFAPI_XREADDIRP_STAT | GFAPI_XREADDIRP_HANDLE);
269  		struct glfs_object *tmp = NULL;
270  		struct stat *sb;
271  		glusterfs_fsal_xstat_t buffxstat = {.e_acl = NULL, .i_acl = NULL};
272  	#endif
273  	
274  	#ifdef GLTIMING
275  		struct timespec s_time, e_time;
276  	
277  		now(&s_time);
278  	#endif
279  	
280  		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
281  				  &op_ctx->creds->caller_gid,
282  				  op_ctx->creds->caller_glen,
283  				  op_ctx->creds->caller_garray,
284  				  op_ctx->client->addr.addr,
285  				  op_ctx->client->addr.len);
286  	
287  		/** @todo : Can we use globalfd instead */
288  		glfd = glfs_h_opendir(glfs_export->gl_fs->fs, objhandle->glhandle);
289  	
290  		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
291  	
292  		if (glfd == NULL)
293  			return gluster2fsal_error(errno);
294  	
295  		if (whence != NULL)
296  			offset = *whence;
297  	
298  		glfs_seekdir(glfd, offset);
299  	
300  		while (!(*eof)) {
301  			struct dirent de;
302  			struct fsal_obj_handle *obj;
303  	
304  			SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
305  					  &op_ctx->creds->caller_gid,
306  					  op_ctx->creds->caller_glen,
307  					  op_ctx->creds->caller_garray,
308  					  op_ctx->client->addr.addr,
309  					  op_ctx->client->addr.len);
310  	
311  	#ifndef USE_GLUSTER_XREADDIRPLUS
312  			rc = glfs_readdir_r(glfd, &de, &pde);
313  	#else
314  			rc = glfs_xreaddirplus_r(glfd, flags, &xstat, &de, &pde);
315  	#endif
316  	
317  			SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
318  	
319  			if (rc < 0) {
320  				status = gluster2fsal_error(errno);
321  				goto out;
322  			}
323  	
324  			if (rc == 0 && pde == NULL) {
325  				*eof = true;
326  				goto out;
327  			}
328  	
329  			struct attrlist attrs;
330  			enum fsal_dir_result cb_rc;
331  	
332  			/* skip . and .. */
333  			if ((strcmp(de.d_name, ".") == 0)
334  			    || (strcmp(de.d_name, "..") == 0)) {
335  	#ifdef USE_GLUSTER_XREADDIRPLUS
336  				if (xstat) {
337  					glfs_free(xstat);
338  					xstat = NULL;
339  				}
340  	#endif
341  				continue;
342  			}
343  			fsal_prepare_attrs(&attrs, attrmask);
344  	
345  	#ifndef USE_GLUSTER_XREADDIRPLUS
346  			status = lookup(dir_hdl, de.d_name, &obj, &attrs);
347  			if (FSAL_IS_ERROR(status))
348  				goto out;
349  	#else
350  			if (!xstat || !(rc & GFAPI_XREADDIRP_HANDLE)) {
351  				status = gluster2fsal_error(errno);
352  				goto out;
353  			}
354  	
355  			sb = glfs_xreaddirplus_get_stat(xstat);
356  			tmp = glfs_xreaddirplus_get_object(xstat);
357  	
358  			if (!sb || !tmp) {
359  				status = gluster2fsal_error(errno);
360  				goto out;
361  			}
362  	
363  			glhandle = glfs_object_copy(tmp);
364  			if (!glhandle) {
365  				status = gluster2fsal_error(errno);
366  				goto out;
367  			}
368  	
369  			status = glfs2fsal_handle(glfs_export, glhandle, &obj,
370  						  sb, &attrs);
371  			glfs_free(xstat);
372  			xstat = NULL;
373  	
374  			if (FSAL_IS_ERROR(status)) {
375  				gluster_cleanup_vars(glhandle);
376  				goto out;
377  			}
378  	
379  			if (attrs.request_mask & ATTR_ACL) {
380  				/* Fetch the ACL */
381  				status = glusterfs_get_acl(glfs_export,
382  							   glhandle,
383  							   &buffxstat, &attrs);
384  				if (status.major == ERR_FSAL_NOENT) {
385  					if (attrs.type == SYMBOLIC_LINK)
386  						status = fsalstat(ERR_FSAL_NO_ERROR, 0);
387  					else
388  						status = gluster2fsal_error(ESTALE);
389  				}
390  	
391  				if (!FSAL_IS_ERROR(status)) {
392  					/* Success, so mark ACL as valid. */
393  					attrs.valid_mask |= ATTR_ACL;
394  				} else {
395  					if (attrs.request_mask & ATTR_RDATTR_ERR)
396  						attrs.valid_mask = ATTR_RDATTR_ERR;
397  	
398  					fsal_release_attrs(&attrs);
399  					glusterfs_fsal_clean_xstat(&buffxstat);
400  					goto out;
401  				}
402  			}
403  	#endif
404  			rc = glusterfs_fsal_get_sec_label(objhandle, &attrs);
405  			if (rc < 0) {
406  				status = gluster2fsal_error(errno);
407  				goto out;
408  			}
409  	
410  			cb_rc = cb(de.d_name, obj, &attrs,
411  				   dir_state, glfs_telldir(glfd));
412  	
413  			fsal_release_attrs(&attrs);
414  			glusterfs_fsal_clean_xstat(&buffxstat);
415  	
416  			/* Read ahead not supported by this FSAL. */
417  			if (cb_rc >= DIR_READAHEAD)
418  				goto out;
419  		}
420  	
421  	 out:
422  	#ifdef USE_GLUSTER_XREADDIRPLUS
423  		if (xstat)
424  			glfs_free(xstat);
425  	#endif
426  		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
427  				  &op_ctx->creds->caller_gid,
428  				  op_ctx->creds->caller_glen,
429  				  op_ctx->creds->caller_garray,
430  				  op_ctx->client->addr.addr,
431  				  op_ctx->client->addr.len);
432  	
433  		rc = glfs_closedir(glfd);
434  	
435  		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
436  		if (rc < 0)
437  			status = gluster2fsal_error(errno);
438  	#ifdef GLTIMING
439  		now(&e_time);
440  		latency_update(&s_time, &e_time, lat_read_dirents);
441  	#endif
442  		return status;
443  	}
444  	
445  	/**
446  	 * @brief Implements GLUSTER FSAL objectoperation mkdir
447  	 */
448  	
449  	static fsal_status_t makedir(struct fsal_obj_handle *dir_hdl,
450  				     const char *name, struct attrlist *attrib,
451  				     struct fsal_obj_handle **handle,
452  				     struct attrlist *attrs_out)
453  	{
454  		int rc = 0;
455  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
456  		struct stat sb;
457  		struct glfs_object *glhandle = NULL;
458  		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
459  		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
460  		struct glusterfs_handle *objhandle = NULL;
461  		struct glusterfs_export *glfs_export =
462  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
463  		struct glusterfs_handle *parenthandle =
464  		    container_of(dir_hdl, struct glusterfs_handle, handle);
465  	#ifdef GLTIMING
466  		struct timespec s_time, e_time;
467  	
468  		now(&s_time);
469  	#endif
470  	
471  		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
472  				  &op_ctx->creds->caller_gid,
473  				  op_ctx->creds->caller_glen,
474  				  op_ctx->creds->caller_garray,
475  				  op_ctx->client->addr.addr,
476  				  op_ctx->client->addr.len);
477  	
478  		glhandle =
479  		    glfs_h_mkdir(glfs_export->gl_fs->fs, parenthandle->glhandle, name,
480  				 fsal2unix_mode(attrib->mode), &sb);
481  	
482  		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
483  	
484  		if (glhandle == NULL) {
485  			status = gluster2fsal_error(errno);
486  			goto out;
487  		}
488  	
489  		rc = glfs_h_extract_handle(glhandle, globjhdl, GFAPI_HANDLE_LENGTH);
490  		if (rc < 0) {
491  			status = gluster2fsal_error(errno);
492  			goto out;
493  		}
494  	
495  		rc = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
496  				       GLAPI_UUID_LENGTH);
497  		if (rc < 0) {
498  			status = gluster2fsal_error(errno);
499  			goto out;
500  		}
501  	
502  		construct_handle(glfs_export, &sb, glhandle, globjhdl,
503  				 &objhandle, vol_uuid);
504  	
505  		if (attrs_out != NULL) {
506  			posix2fsal_attributes_all(&sb, attrs_out);
507  		}
508  	
509  		*handle = &objhandle->handle;
510  	
511  		/* We handled the mode above. */
512  		FSAL_UNSET_MASK(attrib->valid_mask, ATTR_MODE);
513  	
514  		if (attrib->valid_mask) {
515  			/* Now per support_ex API, if there are any other attributes
516  			 * set, go ahead and get them set now.
517  			 */
518  			status = (*handle)->obj_ops->setattr2(*handle, false, NULL,
519  							     attrib);
520  			if (FSAL_IS_ERROR(status)) {
521  				/* Release the handle we just allocated. */
522  				LogFullDebug(COMPONENT_FSAL,
523  					     "setattr2 status=%s",
524  					     fsal_err_txt(status));
525  				(*handle)->obj_ops->release(*handle);
526  				/* We released handle at this point */
527  				glhandle = NULL;
528  				*handle = NULL;
529  			}
530  		} else {
531  			status.major = ERR_FSAL_NO_ERROR;
532  			status.minor = 0;
533  		}
534  	
535  		FSAL_SET_MASK(attrib->valid_mask, ATTR_MODE);
536  	
537  	 out:
538  		if (status.major != ERR_FSAL_NO_ERROR)
539  			gluster_cleanup_vars(glhandle);
540  	
541  	#ifdef GLTIMING
542  		now(&e_time);
543  		latency_update(&s_time, &e_time, lat_makedir);
544  	#endif
545  		return status;
546  	}
547  	
548  	/**
549  	 * @brief Implements GLUSTER FSAL objectoperation mknode
550  	 */
551  	
552  	static fsal_status_t makenode(struct fsal_obj_handle *dir_hdl,
553  				      const char *name, object_file_type_t nodetype,
554  				      struct attrlist *attrib,
555  				      struct fsal_obj_handle **handle,
556  				      struct attrlist *attrs_out)
557  	{
558  		int rc = 0;
559  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
560  		struct stat sb;
561  		struct glfs_object *glhandle = NULL;
562  		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
563  		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
564  		struct glusterfs_handle *objhandle = NULL;
565  		dev_t ndev = { 0, };
566  		struct glusterfs_export *glfs_export =
567  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
568  		struct glusterfs_handle *parenthandle =
569  		    container_of(dir_hdl, struct glusterfs_handle, handle);
570  		mode_t create_mode;
571  	#ifdef GLTIMING
572  		struct timespec s_time, e_time;
573  	
574  		now(&s_time);
575  	#endif
576  	
577  		switch (nodetype) {
578  		case BLOCK_FILE:
579  			/* FIXME: This needs a feature flag test? */
580  			ndev = makedev(attrib->rawdev.major, attrib->rawdev.minor);
581  			create_mode = S_IFBLK;
582  			break;
583  		case CHARACTER_FILE:
584  			ndev = makedev(attrib->rawdev.major, attrib->rawdev.minor);
585  			create_mode = S_IFCHR;
586  			break;
587  		case FIFO_FILE:
588  			create_mode = S_IFIFO;
589  			break;
590  		case SOCKET_FILE:
591  			create_mode = S_IFSOCK;
592  			break;
593  		default:
594  			LogMajor(COMPONENT_FSAL, "Invalid node type in FSAL_mknode: %d",
595  				 nodetype);
596  			return fsalstat(ERR_FSAL_INVAL, 0);
597  		}
598  	
599  		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
600  				  &op_ctx->creds->caller_gid,
601  				  op_ctx->creds->caller_glen,
602  				  op_ctx->creds->caller_garray,
603  				  op_ctx->client->addr.addr,
604  				  op_ctx->client->addr.len);
605  	
606  		glhandle =
607  		    glfs_h_mknod(glfs_export->gl_fs->fs, parenthandle->glhandle, name,
608  				 create_mode | fsal2unix_mode(attrib->mode), ndev, &sb);
609  	
610  		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
611  	
612  		if (glhandle == NULL) {
613  			status = gluster2fsal_error(errno);
614  			goto out;
615  		}
616  	
617  		rc = glfs_h_extract_handle(glhandle, globjhdl, GFAPI_HANDLE_LENGTH);
618  		if (rc < 0) {
619  			status = gluster2fsal_error(errno);
620  			goto out;
621  		}
622  	
623  		rc = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
624  				       GLAPI_UUID_LENGTH);
625  		if (rc < 0) {
626  			status = gluster2fsal_error(errno);
627  			goto out;
628  		}
629  	
630  		construct_handle(glfs_export, &sb, glhandle, globjhdl,
631  				 &objhandle, vol_uuid);
632  	
633  		if (attrs_out != NULL) {
634  			posix2fsal_attributes_all(&sb, attrs_out);
635  		}
636  	
637  		*handle = &objhandle->handle;
638  	
639  		/* We handled the mode above. */
640  		FSAL_UNSET_MASK(attrib->valid_mask, ATTR_MODE);
641  	
642  		if (attrib->valid_mask) {
643  			/* Now per support_ex API, if there are any other attributes
644  			 * set, go ahead and get them set now.
645  			 */
646  			status = (*handle)->obj_ops->setattr2(*handle, false, NULL,
647  							     attrib);
648  			if (FSAL_IS_ERROR(status)) {
649  				/* Release the handle we just allocated. */
650  				LogFullDebug(COMPONENT_FSAL,
651  					     "setattr2 status=%s",
652  					     fsal_err_txt(status));
653  				(*handle)->obj_ops->release(*handle);
654  				/* We released handle at this point */
655  				glhandle = NULL;
656  				*handle = NULL;
657  			}
658  		} else {
659  			status.major = ERR_FSAL_NO_ERROR;
660  			status.minor = 0;
661  		}
662  	
663  		FSAL_SET_MASK(attrib->valid_mask, ATTR_MODE);
664  	
665  	 out:
666  		if (status.major != ERR_FSAL_NO_ERROR)
667  			gluster_cleanup_vars(glhandle);
668  	#ifdef GLTIMING
669  		now(&e_time);
670  		latency_update(&s_time, &e_time, lat_makenode);
671  	#endif
672  		return status;
673  	}
674  	
675  	/**
676  	 * @brief Implements GLUSTER FSAL objectoperation symlink
677  	 */
678  	
679  	static fsal_status_t makesymlink(struct fsal_obj_handle *dir_hdl,
680  					 const char *name, const char *link_path,
681  					 struct attrlist *attrib,
682  					 struct fsal_obj_handle **handle,
683  					 struct attrlist *attrs_out)
684  	{
685  		int rc = 0;
686  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
687  		struct stat sb;
688  		struct glfs_object *glhandle = NULL;
689  		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
690  		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
691  		struct glusterfs_handle *objhandle = NULL;
692  		struct glusterfs_export *glfs_export =
693  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
694  		struct glusterfs_handle *parenthandle =
695  		    container_of(dir_hdl, struct glusterfs_handle, handle);
696  	#ifdef GLTIMING
697  		struct timespec s_time, e_time;
698  	
699  		now(&s_time);
700  	#endif
701  	
702  		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
703  				  &op_ctx->creds->caller_gid,
704  				  op_ctx->creds->caller_glen,
705  				  op_ctx->creds->caller_garray,
706  				  op_ctx->client->addr.addr,
707  				  op_ctx->client->addr.len);
708  	
709  		glhandle =
710  		    glfs_h_symlink(glfs_export->gl_fs->fs, parenthandle->glhandle, name,
711  				   link_path, &sb);
712  	
713  		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
714  	
715  		if (glhandle == NULL) {
716  			status = gluster2fsal_error(errno);
717  			goto out;
718  		}
719  	
720  		rc = glfs_h_extract_handle(glhandle, globjhdl, GFAPI_HANDLE_LENGTH);
721  		if (rc < 0) {
722  			status = gluster2fsal_error(errno);
723  			goto out;
724  		}
725  	
726  		rc = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
727  				       GLAPI_UUID_LENGTH);
728  		if (rc < 0) {
729  			status = gluster2fsal_error(errno);
730  			goto out;
731  		}
732  	
733  		construct_handle(glfs_export, &sb, glhandle, globjhdl,
734  				 &objhandle, vol_uuid);
735  	
736  		if (attrs_out != NULL) {
737  			posix2fsal_attributes_all(&sb, attrs_out);
738  		}
739  	
740  		*handle = &objhandle->handle;
741  	
742  		if (attrib->valid_mask) {
743  			/* Now per support_ex API, if there are any other attributes
744  			 * set, go ahead and get them set now.
745  			 */
746  			status = (*handle)->obj_ops->setattr2(*handle, false, NULL,
747  							     attrib);
748  			if (FSAL_IS_ERROR(status)) {
749  				/* Release the handle we just allocated. */
750  				LogFullDebug(COMPONENT_FSAL,
751  					     "setattr2 status=%s",
752  					     fsal_err_txt(status));
753  				(*handle)->obj_ops->release(*handle);
754  				/* We released handle at this point */
755  				glhandle = NULL;
756  				*handle = NULL;
757  			}
758  		} else {
759  			status.major = ERR_FSAL_NO_ERROR;
760  			status.minor = 0;
761  		}
762  	
763  	 out:
764  		if (status.major != ERR_FSAL_NO_ERROR)
765  			gluster_cleanup_vars(glhandle);
766  	
767  	#ifdef GLTIMING
768  		now(&e_time);
769  		latency_update(&s_time, &e_time, lat_makesymlink);
770  	#endif
771  	
772  		return status;
773  	}
774  	
775  	/**
776  	 * @brief Implements GLUSTER FSAL objectoperation readlink
777  	 */
778  	
779  	static fsal_status_t readsymlink(struct fsal_obj_handle *obj_hdl,
780  					 struct gsh_buffdesc *link_content,
781  					 bool refresh)
782  	{
783  		int rc = 0;
784  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
785  		struct glusterfs_export *glfs_export =
786  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
787  		struct glusterfs_handle *objhandle =
788  		    container_of(obj_hdl, struct glusterfs_handle, handle);
789  	#ifdef GLTIMING
790  		struct timespec s_time, e_time;
791  	
792  		now(&s_time);
793  	#endif
794  	
795  		link_content->len = MAXPATHLEN; /* Max link path */
796  		link_content->addr = gsh_malloc(link_content->len);
797  	
798  		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
799  				  &op_ctx->creds->caller_gid,
800  				  op_ctx->creds->caller_glen,
801  				  op_ctx->creds->caller_garray,
802  				  op_ctx->client->addr.addr,
803  				  op_ctx->client->addr.len);
804  	
805  		rc = glfs_h_readlink(glfs_export->gl_fs->fs, objhandle->glhandle,
806  				     link_content->addr, link_content->len);
807  	
808  		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
809  	
810  		if (rc < 0) {
811  			status = gluster2fsal_error(errno);
812  			goto out;
813  		}
814  	
815  		if (rc >= MAXPATHLEN) {
816  			status = gluster2fsal_error(EINVAL);
817  			goto out;
818  		}
819  	
820  		/* rc is the number of bytes copied into link_content->addr
821  		 * without including '\0' character. */
822  		*(char *)(link_content->addr + rc) = '\0';
823  		link_content->len = rc + 1;
824  	
825  	 out:
826  		if (status.major != ERR_FSAL_NO_ERROR) {
827  			gsh_free(link_content->addr);
828  			link_content->addr = NULL;
829  			link_content->len = 0;
830  		}
831  	#ifdef GLTIMING
832  		now(&e_time);
833  		latency_update(&s_time, &e_time, lat_readsymlink);
834  	#endif
835  	
836  		return status;
837  	}
838  	
839  	/**
840  	 * @brief Implements GLUSTER FSAL objectoperation getattrs
841  	 */
842  	
843  	static fsal_status_t getattrs(struct fsal_obj_handle *obj_hdl,
844  				      struct attrlist *attrs)
845  	{
846  		int rc = 0;
847  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
848  		glusterfs_fsal_xstat_t buffxstat = { 0 };
849  		struct glusterfs_export *glfs_export =
850  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
851  		struct glusterfs_handle *objhandle =
852  		    container_of(obj_hdl, struct glusterfs_handle, handle);
853  	#ifdef GLTIMING
854  		struct timespec s_time, e_time;
855  	
856  		now(&s_time);
857  	#endif
858  	
859  		/*
860  		 * There is a kind of race here when the glfd part of the
861  		 * FSAL GLUSTER object handle is destroyed during a close
862  		 * coming in from another NFSv3 WRITE thread which does
863  		 * cache_inode_open(). Since the context/fd is destroyed
864  		 * we cannot depend on glfs_fstat assuming glfd is valid.
865  	
866  		 * Fixing the issue by removing the glfs_fstat call here.
867  	
868  		 * So default to glfs_h_stat and re-optimize if a better
869  		 * way is found - that may involve introducing locks in
870  		 * the gfapi's for close and getattrs etc.
871  		 */
872  	
873  		/** @todo: With support_ex() above may no longer be valid.
874  		 * This needs to be revisited */
875  	
876  		/* @todo: with POSIX ACLs every user shall have permissions to
877  		 * read stat & ACLs. But that may not be the case with RichACLs.
878  		 * If the ganesha service is started by non-root user, that user
879  		 * may get restricted from reading ACL.
880  		 */
881  	
882  		rc = glfs_h_stat(glfs_export->gl_fs->fs,
883  				 objhandle->glhandle, &buffxstat.buffstat);
884  	
885  		if (rc != 0) {
886  			if (errno == ENOENT)
887  				status = gluster2fsal_error(ESTALE);
888  			else
889  				status = gluster2fsal_error(errno);
890  	
891  			if (attrs->request_mask & ATTR_RDATTR_ERR) {
892  				/* Caller asked for error to be visible. */
893  				attrs->valid_mask = ATTR_RDATTR_ERR;
894  			}
895  			goto out;
896  		}
897  	
898  		stat2fsal_attributes(&buffxstat.buffstat, attrs);
899  	
900  		if (obj_hdl->type == DIRECTORY)
901  			buffxstat.is_dir = true;
902  		else
903  			buffxstat.is_dir = false;
904  	
905  		if (attrs->request_mask & ATTR_ACL) {
906  			/* Fetch the ACL */
907  			status = glusterfs_get_acl(glfs_export, objhandle->glhandle,
908  						   &buffxstat, attrs);
909  			if (!FSAL_IS_ERROR(status)) {
910  				/* Success, so mark ACL as valid. */
911  				attrs->valid_mask |= ATTR_ACL;
912  			}
913  		}
914  	
915  		rc = glusterfs_fsal_get_sec_label(objhandle, attrs);
916  	
917  		if (rc < 0) {
918  			if (errno == ENOENT)
919  				status = gluster2fsal_error(ESTALE);
920  			else
921  				status = gluster2fsal_error(errno);
922  			if (attrs->request_mask & ATTR_RDATTR_ERR) {
923  				/* Caller asked for error to be visible. */
924  				attrs->valid_mask = ATTR_RDATTR_ERR;
925  			}
926  			goto out;
927  		}
928  	
929  		/* *
930  		* The error ENOENT is not an expected error for GETATTRS
931  		* Due to this, operations such as RENAME will fail when
932  		* it calls GETATTRS on removed file. But for dead links
933  		* we should not return error
934  		* */
935  		if (status.major == ERR_FSAL_NOENT) {
936  			if (obj_hdl->type == SYMBOLIC_LINK)
937  				status = fsalstat(ERR_FSAL_NO_ERROR, 0);
938  			else
939  				status = gluster2fsal_error(ESTALE);
940  		}
941  	
942  		if (FSAL_IS_ERROR(status)) {
943  			if (attrs->request_mask & ATTR_RDATTR_ERR) {
944  				/* Caller asked for error to be visible. */
945  				attrs->valid_mask = ATTR_RDATTR_ERR;
946  			}
947  		}
948  	
949  	 out:
950  	#ifdef GLTIMING
951  		now(&e_time);
952  		latency_update(&s_time, &e_time, lat_getattrs);
953  	#endif
954  	
955  		glusterfs_fsal_clean_xstat(&buffxstat);
956  		return status;
957  	}
958  	
959  	/**
960  	 * @brief Implements GLUSTER FSAL objectoperation link
961  	 */
962  	
963  	static fsal_status_t linkfile(struct fsal_obj_handle *obj_hdl,
964  				      struct fsal_obj_handle *destdir_hdl,
965  				      const char *name)
966  	{
967  		int rc = 0;
968  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
969  		struct glusterfs_export *glfs_export =
970  		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
971  		struct glusterfs_handle *objhandle =
972  		    container_of(obj_hdl, struct glusterfs_handle, handle);
973  		struct glusterfs_handle *dstparenthandle =
974  		    container_of(destdir_hdl, struct glusterfs_handle, handle);
975  	#ifdef GLTIMING
976  		struct timespec s_time, e_time;
977  	
978  		now(&s_time);
979  	#endif
980  	
981  		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
982  				  &op_ctx->creds->caller_gid,
983  				  op_ctx->creds->caller_glen,
984  				  op_ctx->creds->caller_garray,
985  				  op_ctx->client->addr.addr,
986  				  op_ctx->client->addr.len);
987  	
988  		rc = glfs_h_link(glfs_export->gl_fs->fs, objhandle->glhandle,
989  				 dstparenthandle->glhandle, name);
990  	
991  		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
992  	
993  		if (rc != 0) {
994  			status = gluster2fsal_error(errno);
995  			goto out;
996  		}
997  	
998  	 out:
999  	#ifdef GLTIMING
1000 		now(&e_time);
1001 		latency_update(&s_time, &e_time, lat_linkfile);
1002 	#endif
1003 	
1004 		return status;
1005 	}
1006 	
1007 	/**
1008 	 * @brief Implements GLUSTER FSAL objectoperation rename
1009 	 */
1010 	
1011 	static fsal_status_t renamefile(struct fsal_obj_handle *obj_hdl,
1012 					struct fsal_obj_handle *olddir_hdl,
1013 					const char *old_name,
1014 					struct fsal_obj_handle *newdir_hdl,
1015 					const char *new_name)
1016 	{
1017 		int rc = 0;
1018 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
1019 		struct glusterfs_export *glfs_export =
1020 		    container_of(op_ctx->fsal_export, struct glusterfs_export,
1021 				 export);
1022 		struct glusterfs_handle *srcparenthandle =
1023 		    container_of(olddir_hdl, struct glusterfs_handle, handle);
1024 		struct glusterfs_handle *dstparenthandle =
1025 		    container_of(newdir_hdl, struct glusterfs_handle, handle);
1026 	#ifdef GLTIMING
1027 		struct timespec s_time, e_time;
1028 	
1029 		now(&s_time);
1030 	#endif
1031 	
1032 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
1033 				  &op_ctx->creds->caller_gid,
1034 				  op_ctx->creds->caller_glen,
1035 				  op_ctx->creds->caller_garray,
1036 				  op_ctx->client->addr.addr,
1037 				  op_ctx->client->addr.len);
1038 	
1039 		rc = glfs_h_rename(glfs_export->gl_fs->fs, srcparenthandle->glhandle,
1040 				   old_name, dstparenthandle->glhandle, new_name);
1041 	
1042 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
1043 	
1044 		if (rc != 0) {
1045 			status = gluster2fsal_error(errno);
1046 			goto out;
1047 		}
1048 	
1049 	 out:
1050 	#ifdef GLTIMING
1051 		now(&e_time);
1052 		latency_update(&s_time, &e_time, lat_renamefile);
1053 	#endif
1054 	
1055 		return status;
1056 	}
1057 	
1058 	/**
1059 	 * @brief Implements GLUSTER FSAL objectoperation unlink
1060 	 */
1061 	
1062 	static fsal_status_t file_unlink(struct fsal_obj_handle *dir_hdl,
1063 					 struct fsal_obj_handle *obj_hdl,
1064 					 const char *name)
1065 	{
1066 		int rc = 0;
1067 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
1068 		struct glusterfs_export *glfs_export =
1069 		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
1070 		struct glusterfs_handle *parenthandle =
1071 		    container_of(dir_hdl, struct glusterfs_handle, handle);
1072 	#ifdef GLTIMING
1073 		struct timespec s_time, e_time;
1074 	
1075 		now(&s_time);
1076 	#endif
1077 	
1078 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
1079 				  &op_ctx->creds->caller_gid,
1080 				  op_ctx->creds->caller_glen,
1081 				  op_ctx->creds->caller_garray,
1082 				  op_ctx->client->addr.addr,
1083 				  op_ctx->client->addr.len);
1084 	
1085 		rc = glfs_h_unlink(glfs_export->gl_fs->fs, parenthandle->glhandle,
1086 				   name);
1087 	
1088 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
1089 	
1090 		if (rc != 0)
1091 			status = gluster2fsal_error(errno);
1092 	
1093 	#ifdef GLTIMING
1094 		now(&e_time);
1095 		latency_update(&s_time, &e_time, lat_file_unlink);
1096 	#endif
1097 		return status;
1098 	}
1099 	
1100 	/**
1101 	 * @brief Implements GLUSTER FSAL objectoperation share_op
1102 	 */
1103 	/*
1104 	static fsal_status_t share_op(struct fsal_obj_handle *obj_hdl,
1105 				      void *p_owner,
1106 				      fsal_share_param_t  request_share)
1107 	{
1108 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
1109 	}
1110 	*/
1111 	
1112 	fsal_status_t glusterfs_open_my_fd(struct glusterfs_handle *objhandle,
1113 					   fsal_openflags_t openflags,
1114 					   int posix_flags,
1115 					   struct glusterfs_fd *my_fd)
1116 	{
1117 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
1118 		struct glfs_fd *glfd = NULL;
1119 		struct glusterfs_export *glfs_export =
1120 		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
1121 		gid_t **garray_copy = NULL;
1122 	#ifdef GLTIMING
1123 		struct timespec s_time, e_time;
1124 	
1125 		now(&s_time);
1126 	#endif
1127 	
1128 		LogFullDebug(COMPONENT_FSAL,
1129 			     "my_fd->fd = %p openflags = %x, posix_flags = %x",
1130 			     my_fd->glfd, openflags, posix_flags);
1131 	
1132 		assert(my_fd->glfd == NULL
1133 		       && my_fd->openflags == FSAL_O_CLOSED && openflags != 0);
1134 	
1135 		LogFullDebug(COMPONENT_FSAL,
1136 			     "openflags = %x, posix_flags = %x",
1137 			     openflags, posix_flags);
1138 	
1139 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
1140 				  &op_ctx->creds->caller_gid,
1141 				  op_ctx->creds->caller_glen,
1142 				  op_ctx->creds->caller_garray,
1143 				  op_ctx->client->addr.addr,
1144 				  op_ctx->client->addr.len);
1145 	
1146 		glfd = glfs_h_open(glfs_export->gl_fs->fs, objhandle->glhandle,
1147 				   posix_flags);
1148 	
1149 		/* restore credentials */
1150 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
1151 	
1152 		if (glfd == NULL) {
1153 			status = gluster2fsal_error(errno);
1154 			goto out;
1155 		}
1156 	
1157 		my_fd->glfd = glfd;
1158 		my_fd->openflags = FSAL_O_NFS_FLAGS(openflags);
1159 		my_fd->creds.caller_uid = op_ctx->creds->caller_uid;
1160 		my_fd->creds.caller_gid = op_ctx->creds->caller_gid;
1161 		my_fd->creds.caller_glen = op_ctx->creds->caller_glen;
1162 		garray_copy = &my_fd->creds.caller_garray;
1163 	
1164 		if ((*garray_copy) != NULL) {
1165 			/* Replace old creds */
1166 			gsh_free(*garray_copy);
1167 			*garray_copy = NULL;
1168 		}
1169 	
1170 		if (op_ctx->creds->caller_glen) {
1171 			(*garray_copy) = gsh_malloc(op_ctx->creds->caller_glen
1172 						    * sizeof(gid_t));
1173 			memcpy((*garray_copy), op_ctx->creds->caller_garray,
1174 				op_ctx->creds->caller_glen * sizeof(gid_t));
1175 		}
1176 	
1177 	#ifdef USE_GLUSTER_DELEGATION
1178 		if ((op_ctx->client->addr.len) &&
1179 			(op_ctx->client->addr.len <= GLAPI_LEASE_ID_SIZE)) {
1180 			memcpy(my_fd->lease_id, op_ctx->client->addr.addr,
1181 			       op_ctx->client->addr.len);
1182 		} else
1183 			memset(my_fd->lease_id, 0, GLAPI_LEASE_ID_SIZE);
1184 	#endif
1185 	
1186 	out:
1187 	#ifdef GLTIMING
1188 		now(&e_time);
1189 		latency_update(&s_time, &e_time, lat_file_open);
1190 	#endif
1191 		return status;
1192 	}
1193 	
1194 	fsal_status_t glusterfs_close_my_fd(struct glusterfs_fd *my_fd)
1195 	{
1196 		int rc = 0;
1197 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
1198 		struct glusterfs_export *glfs_export =
1199 		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
1200 	
1201 	#ifdef GLTIMING
1202 		struct timespec s_time, e_time;
1203 	
1204 		now(&s_time);
1205 	#endif
1206 	
1207 		if (my_fd->glfd && my_fd->openflags != FSAL_O_CLOSED) {
1208 	
1209 			/* Use the same credentials which opened up the fd */
1210 			SET_GLUSTER_CREDS(glfs_export, &my_fd->creds.caller_uid,
1211 					  &my_fd->creds.caller_gid,
1212 					  my_fd->creds.caller_glen,
1213 					  my_fd->creds.caller_garray,
1214 	#ifdef USE_GLUSTER_DELEGATION
1215 					  my_fd->lease_id,
1216 					  GLAPI_LEASE_ID_SIZE);
1217 	#else
1218 					  NULL, 0);
1219 	#endif
1220 	
1221 			rc = glfs_close(my_fd->glfd);
1222 	
1223 			/* restore credentials */
1224 			SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
1225 	
1226 			if (rc != 0) {
1227 				status = gluster2fsal_error(errno);
1228 				LogCrit(COMPONENT_FSAL,
1229 					"Error : close returns with %s",
1230 					strerror(errno));
1231 			}
1232 		}
1233 	
1234 		my_fd->glfd = NULL;
1235 		my_fd->openflags = FSAL_O_CLOSED;
1236 		my_fd->creds.caller_uid = 0;
1237 		my_fd->creds.caller_gid = 0;
1238 		my_fd->creds.caller_glen = 0;
1239 		if (my_fd->creds.caller_garray) {
1240 			gsh_free(my_fd->creds.caller_garray);
1241 			my_fd->creds.caller_garray = NULL;
1242 		}
1243 	#ifdef USE_GLUSTER_DELEGATION
1244 		memset(my_fd->lease_id, 0, GLAPI_LEASE_ID_SIZE);
1245 	#endif
1246 	
1247 	#ifdef GLTIMING
1248 		now(&e_time);
1249 		latency_update(&s_time, &e_time, lat_file_close);
1250 	#endif
1251 		return status;
1252 	}
1253 	
1254 	/**
1255 	 * @brief Implements GLUSTER FSAL objectoperation close
1256 	   @todo: close2() could be used to close globalfd as well.
1257 	 */
1258 	
1259 	static fsal_status_t file_close(struct fsal_obj_handle *obj_hdl)
1260 	{
1261 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
1262 		struct glusterfs_handle *objhandle =
1263 		    container_of(obj_hdl, struct glusterfs_handle, handle);
1264 	#ifdef GLTIMING
1265 		struct timespec s_time, e_time;
1266 	
1267 		now(&s_time);
1268 	#endif
1269 	
1270 		assert(obj_hdl->type == REGULAR_FILE);
1271 	
1272 		if (objhandle->globalfd.openflags == FSAL_O_CLOSED)
1273 			return fsalstat(ERR_FSAL_NOT_OPENED, 0);
1274 	
1275 		/* Take write lock on object to protect file descriptor.
1276 		 * This can block over an I/O operation.
1277 		 */
1278 		PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1279 	
1280 		status = glusterfs_close_my_fd(&objhandle->globalfd);
1281 		objhandle->globalfd.openflags = FSAL_O_CLOSED;
1282 	
1283 		PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1284 	
1285 	#ifdef GLTIMING
1286 		now(&e_time);
1287 		latency_update(&s_time, &e_time, lat_file_close);
1288 	#endif
1289 		return status;
1290 	}
1291 	
1292 	/**
1293 	 * @brief Function to open an fsal_obj_handle's global file descriptor.
1294 	 *
1295 	 * @param[in]  obj_hdl     File on which to operate
1296 	 * @param[in]  openflags   Mode for open
1297 	 * @param[out] fd	  File descriptor that is to be used
1298 	 *
1299 	 * @return FSAL status.
1300 	 */
1301 	
1302 	fsal_status_t glusterfs_open_func(struct fsal_obj_handle *obj_hdl,
1303 					  fsal_openflags_t openflags,
1304 					  struct fsal_fd *fd)
1305 	{
1306 		struct glusterfs_handle *myself;
1307 		int posix_flags = 0;
1308 	
1309 		myself = container_of(obj_hdl, struct glusterfs_handle, handle);
1310 	
1311 		fsal2posix_openflags(openflags, &posix_flags);
1312 	
1313 		return glusterfs_open_my_fd(myself, openflags, posix_flags,
1314 					   (struct glusterfs_fd *)fd);
1315 	}
1316 	
1317 	/**
1318 	 * @brief Function to close an fsal_obj_handle's global file descriptor.
1319 	 *
1320 	 * @param[in]  obj_hdl     File on which to operate
1321 	 * @param[in]  fd	  File handle to close
1322 	 *
1323 	 * @return FSAL status.
1324 	 */
1325 	
1326 	fsal_status_t glusterfs_close_func(struct fsal_obj_handle *obj_hdl,
1327 					   struct fsal_fd *fd)
1328 	{
1329 		return glusterfs_close_my_fd((struct glusterfs_fd *)fd);
1330 	}
1331 	
1332 	/**
1333 	 * @brief Find a file descriptor for a read, write, setattr2 operation.
1334 	 *
1335 	 * We do not need file descriptors for non-regular files, so this never has to
1336 	 * handle them.
1337 	 */
1338 	fsal_status_t find_fd(struct glusterfs_fd *my_fd,
1339 			      struct fsal_obj_handle *obj_hdl,
1340 			      bool bypass,
1341 			      struct state_t *state,
1342 			      fsal_openflags_t openflags,
1343 			      bool *has_lock,
1344 			      bool *closefd,
1345 			      bool open_for_locks)
1346 	{
1347 		struct glusterfs_handle *myself;
1348 		fsal_status_t status = {ERR_FSAL_NO_ERROR, 0};
1349 		struct glusterfs_fd  tmp_fd = {0}, *tmp2_fd = &tmp_fd;
1350 		bool reusing_open_state_fd = false;
1351 	
1352 		myself = container_of(obj_hdl, struct glusterfs_handle, handle);
1353 	
1354 		/* Handle only regular files */
1355 		if (obj_hdl->type != REGULAR_FILE)
1356 			return fsalstat(posix2fsal_error(EINVAL), EINVAL);
1357 		status = fsal_find_fd((struct fsal_fd **)&tmp2_fd, obj_hdl,
1358 					(struct fsal_fd *)&myself->globalfd,
1359 					&myself->share, bypass, state,
1360 					openflags, glusterfs_open_func,
1361 					glusterfs_close_func,
1362 					has_lock, closefd, open_for_locks,
1363 					&reusing_open_state_fd);
1364 	
1365 		if (FSAL_IS_ERROR(status))
1366 			goto out;
1367 	
1368 		/* since tmp2_fd is not accessed/closed outside
1369 		* this routine, its safe to copy its variables into my_fd
1370 		* without taking extra reference or allocating extra
1371 		* memory.
1372 		*/
1373 		if (reusing_open_state_fd) {
1374 			my_fd->glfd = glfs_dup(tmp2_fd->glfd);
1375 			if (tmp2_fd->creds.caller_glen)
1376 				my_fd->creds.caller_garray =
1377 					gsh_memdup(tmp2_fd->creds.caller_garray,
1378 						   tmp2_fd->creds.caller_glen *
1379 						   sizeof(gid_t));
1380 			/* Since we dup the fd, we need to close it post
1381 			 * processing the fop.
1382 			 */
1383 			*closefd = true;
1384 		} else {
1385 			my_fd->glfd = tmp2_fd->glfd;
1386 			my_fd->creds.caller_garray = tmp2_fd->creds.caller_garray;
1387 		}
1388 	
1389 		my_fd->openflags = tmp2_fd->openflags;
1390 		my_fd->creds.caller_uid = tmp2_fd->creds.caller_uid;
1391 		my_fd->creds.caller_gid = tmp2_fd->creds.caller_gid;
1392 		my_fd->creds.caller_glen = tmp2_fd->creds.caller_glen;
1393 	#ifdef USE_GLUSTER_DELEGATION
1394 		memcpy(my_fd->lease_id, tmp2_fd->lease_id, GLAPI_LEASE_ID_SIZE);
1395 	#endif
1396 	
1397 	out:
1398 		return status;
1399 	}
1400 	
1401 	/**
1402 	 * @brief Merge a duplicate handle with an original handle
1403 	 *
1404 	 * This function is used if an upper layer detects that a duplicate
1405 	 * object handle has been created. It allows the FSAL to merge anything
1406 	 * from the duplicate back into the original.
1407 	 *
1408 	 * The caller must release the object (the caller may have to close
1409 	 * files if the merge is unsuccessful).
1410 	 *
1411 	 * @param[in]  orig_hdl  Original handle
1412 	 * @param[in]  dupe_hdl Handle to merge into original
1413 	 *
1414 	 * @return FSAL status.
1415 	 *
1416 	 */
1417 	
1418 	fsal_status_t glusterfs_merge(struct fsal_obj_handle *orig_hdl,
1419 				      struct fsal_obj_handle *dupe_hdl)
1420 	{
1421 		fsal_status_t status = {ERR_FSAL_NO_ERROR, 0};
1422 	
1423 		if (orig_hdl->type == REGULAR_FILE &&
1424 		    dupe_hdl->type == REGULAR_FILE) {
1425 			/* We need to merge the share reservations on this file.
1426 			 * This could result in ERR_FSAL_SHARE_DENIED.
1427 			 */
1428 			struct glusterfs_handle *orig, *dupe;
1429 	
1430 			orig = container_of(orig_hdl, struct glusterfs_handle, handle);
1431 			dupe = container_of(dupe_hdl, struct glusterfs_handle, handle);
1432 	
1433 			/* This can block over an I/O operation. */
1434 			PTHREAD_RWLOCK_wrlock(&orig_hdl->obj_lock);
1435 	
1436 			status = merge_share(&orig->share, &dupe->share);
1437 	
1438 			PTHREAD_RWLOCK_unlock(&orig_hdl->obj_lock);
1439 		}
1440 	
1441 		return status;
1442 	}
1443 	
1444 	/* open2
1445 	 */
1446 	
1447 	static fsal_status_t glusterfs_open2(struct fsal_obj_handle *obj_hdl,
1448 					     struct state_t *state,
1449 					     fsal_openflags_t openflags,
1450 					     enum fsal_create_mode createmode,
1451 					     const char *name,
1452 					     struct attrlist *attrib_set,
1453 					     fsal_verifier_t verifier,
1454 					     struct fsal_obj_handle **new_obj,
1455 					     struct attrlist *attrs_out,
1456 					     bool *caller_perm_check)
1457 	{
1458 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
1459 		int p_flags = 0;
1460 		struct glusterfs_export *glfs_export =
1461 		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
1462 		struct glusterfs_handle *myself, *parenthandle = NULL;
1463 		struct glusterfs_fd *my_fd = NULL;
1464 		struct stat sb = {0};
1465 		struct glfs_object *glhandle = NULL;
1466 		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
1467 		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
1468 		bool truncated;
1469 		bool created = false;
1470 		int retval = 0;
1471 		mode_t unix_mode;
1472 	
1473 	
1474 	#ifdef GLTIMING
1475 		struct timespec s_time, e_time;
1476 	
1477 		now(&s_time);
1478 	#endif
1479 	
1480 		if (state != NULL)
1481 			my_fd = &container_of(state, struct glusterfs_state_fd,
1482 					      state)->glusterfs_fd;
1483 	
1484 		fsal2posix_openflags(openflags, &p_flags);
1485 	
1486 		truncated = (p_flags & O_TRUNC) != 0;
1487 	
1488 		if (createmode >= FSAL_EXCLUSIVE) {
1489 			/* Now fixup attrs for verifier if exclusive create */
1490 			set_common_verifier(attrib_set, verifier);
1491 		}
1492 	
1493 		if (name == NULL) {
1494 			/* This is an open by handle */
1495 			struct glusterfs_handle *myself;
1496 	
1497 			myself = container_of(obj_hdl,
1498 					      struct glusterfs_handle,
1499 					      handle);
1500 	
1501 	#if 0
1502 		/** @todo: fsid work */
1503 		if (obj_hdl->fsal != obj_hdl->fs->fsal) {
1504 			LogDebug(COMPONENT_FSAL,
1505 				 "FSAL %s operation for handle belonging to FSAL %s, return EXDEV",
1506 				 obj_hdl->fsal->name, obj_hdl->fs->fsal->name);
1507 			return fsalstat(posix2fsal_error(EXDEV), EXDEV);
1508 		}
1509 	#endif
1510 	
1511 			if (state != NULL) {
1512 				/* Prepare to take the share reservation, but only if we
1513 				 * are called with a valid state (if state is NULL the
1514 				 * caller is a stateless create such as NFS v3 CREATE).
1515 				 */
1516 	
1517 				/* This can block over an I/O operation. */
1518 				PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1519 	
1520 				/* Check share reservation conflicts. */
1521 				status = check_share_conflict(&myself->share,
1522 							      openflags,
1523 							      false);
1524 	
1525 				if (FSAL_IS_ERROR(status)) {
1526 					PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1527 					return status;
1528 				}
1529 	
1530 				/* Take the share reservation now by updating the
1531 				 * counters.
1532 				 */
1533 				update_share_counters(&myself->share,
1534 						      FSAL_O_CLOSED,
1535 						      openflags);
1536 	
1537 				PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1538 			} else {
1539 				/* We need to use the global fd to continue, and take
1540 				 * the lock to protect it.
1541 				 */
1542 				my_fd = &myself->globalfd;
1543 				PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1544 			}
1545 	
1546 			if (my_fd->openflags != FSAL_O_CLOSED) {
1547 				glusterfs_close_my_fd(my_fd);
1548 			}
1549 	
1550 			/* truncate is set in p_flags */
1551 			status = glusterfs_open_my_fd(myself, openflags, p_flags,
1552 						      my_fd);
1553 	
1554 			if (FSAL_IS_ERROR(status)) {
1555 				status = gluster2fsal_error(errno);
1556 				if (state == NULL) {
1557 					/* Release the lock taken above, and return
1558 					 * since there is nothing to undo.
1559 					 */
1560 					PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1561 					goto out;
1562 				} else {
1563 					/* Error - need to release the share */
1564 					goto undo_share;
1565 				}
1566 			}
1567 	
1568 			if (createmode >= FSAL_EXCLUSIVE || truncated) {
1569 				/* Fetch the attributes to check against the
1570 				 * verifier in case of exclusive open/create.
1571 				 */
1572 				struct stat stat;
1573 	
1574 				/* set proper credentials */
1575 				/* @todo: with POSIX ACLs every user shall have
1576 				 * permissions to read stat & ACLs. But that may not be
1577 				 * the case with RichACLs. If the ganesha service is
1578 				 * started by non-root user, that user may get
1579 				 * restricted from reading ACL.
1580 				 */
1581 	
1582 				retval = glfs_fstat(my_fd->glfd, &stat);
1583 	
1584 				if (retval == 0) {
1585 					LogFullDebug(COMPONENT_FSAL,
1586 						     "New size = %" PRIx64,
1587 						     stat.st_size);
1588 					if (attrs_out) {
1589 						posix2fsal_attributes_all(&stat,
1590 									  attrs_out);
1591 					}
1592 				} else {
1593 					if (errno == EBADF)
1594 						errno = ESTALE;
1595 					status = fsalstat(posix2fsal_error(errno),
1596 							  errno);
1597 				}
1598 	
1599 				/* Now check verifier for exclusive, but not for
1600 				 * FSAL_EXCLUSIVE_9P.
1601 				 */
1602 				if (!FSAL_IS_ERROR(status) &&
1603 				    createmode >= FSAL_EXCLUSIVE &&
1604 				    createmode != FSAL_EXCLUSIVE_9P &&
1605 				    !check_verifier_stat(&stat,
1606 							 verifier)) {
1607 					/* Verifier didn't match, return EEXIST */
1608 					status =
1609 					    fsalstat(posix2fsal_error(EEXIST), EEXIST);
1610 				}
1611 	
1612 			} else if (attrs_out && attrs_out->request_mask &
1613 				   ATTR_RDATTR_ERR) {
1614 				attrs_out->valid_mask = ATTR_RDATTR_ERR;
1615 			}
1616 	
1617 			if (state == NULL) {
1618 				/* If no state, release the lock taken above and return
1619 				 * status. If success, we haven't done any permission
1620 				 * check so ask the caller to do so.
1621 				 */
1622 				PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1623 				*caller_perm_check = !FSAL_IS_ERROR(status);
1624 				return status;
1625 			}
1626 	
1627 			if (!FSAL_IS_ERROR(status)) {
1628 				/* Return success. We haven't done any permission
1629 				 * check so ask the caller to do so.
1630 				 */
1631 				*caller_perm_check = true;
1632 				return status;
1633 			}
1634 	
1635 			(void) glusterfs_close_my_fd(my_fd);
1636 	
1637 	 undo_share:
1638 	
1639 			/* Can only get here with state not NULL and an error */
1640 	
1641 			/* On error we need to release our share reservation
1642 			 * and undo the update of the share counters.
1643 			 * This can block over an I/O operation.
1644 			 */
1645 			PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1646 	
1647 			update_share_counters(&myself->share,
1648 					      openflags,
1649 					      FSAL_O_CLOSED);
1650 	
1651 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1652 	
1653 			return status;
1654 		}
1655 	
1656 	/* case name_not_null */
1657 		/* In this path where we are opening by name, we can't check share
1658 		 * reservation yet since we don't have an object_handle yet. If we
1659 		 * indeed create the object handle (there is no race with another
1660 		 * open by name), then there CAN NOT be a share conflict, otherwise
1661 		 * the share conflict will be resolved when the object handles are
1662 		 * merged.
1663 		 */
1664 	
1665 		if (createmode != FSAL_NO_CREATE) {
1666 			/* Now add in O_CREAT and O_EXCL. */
1667 			p_flags |= O_CREAT;
1668 	
1669 			/* And if we are at least FSAL_GUARDED, do an O_EXCL create. */
1670 			if (createmode >= FSAL_GUARDED)
1671 				p_flags |= O_EXCL;
1672 	
1673 			/* Fetch the mode attribute to use. */
1674 			unix_mode = fsal2unix_mode(attrib_set->mode) &
1675 			    ~op_ctx->fsal_export->exp_ops.fs_umask(op_ctx->fsal_export);
1676 	
1677 			/* Don't set the mode if we later set the attributes */
1678 			FSAL_UNSET_MASK(attrib_set->valid_mask, ATTR_MODE);
1679 		}
1680 	
1681 		if (createmode == FSAL_UNCHECKED && (attrib_set->valid_mask != 0)) {
1682 			/* If we have FSAL_UNCHECKED and want to set more attributes
1683 			 * than the mode, we attempt an O_EXCL create first, if that
1684 			 * succeeds, then we will be allowed to set the additional
1685 			 * attributes, otherwise, we don't know we created the file
1686 			 * and this can NOT set the attributes.
1687 			 */
1688 			p_flags |= O_EXCL;
1689 		}
1690 	
1691 		/** @todo: we do not have openat implemented yet..meanwhile
1692 		 *  use 'glfs_h_creat'
1693 		 */
1694 	
1695 		/* obtain parent directory handle */
1696 		parenthandle =
1697 		    container_of(obj_hdl, struct glusterfs_handle, handle);
1698 	
1699 	
1700 		if (createmode == FSAL_NO_CREATE) {
1701 			/* lookup if the object exists */
1702 			status = (obj_hdl)->obj_ops->lookup(obj_hdl,
1703 							   name,
1704 							   new_obj,
1705 							   attrs_out);
1706 	
1707 			if (FSAL_IS_ERROR(status)) {
1708 				*new_obj = NULL;
1709 				goto direrr;
1710 			}
1711 	
1712 			myself = container_of(*new_obj,
1713 					      struct glusterfs_handle,
1714 					      handle);
1715 	
1716 			/* The open is not done with the caller's credentials so ask
1717 			 * the caller to perform a permission check.
1718 			 */
1719 			*caller_perm_check = true;
1720 			goto open;
1721 		}
1722 	
1723 		/* Become the user because we are creating an object in this dir.
1724 		 */
1725 		/* set proper credentials */
1726 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
1727 				  &op_ctx->creds->caller_gid,
1728 				  op_ctx->creds->caller_glen,
1729 				  op_ctx->creds->caller_garray,
1730 				  op_ctx->client->addr.addr,
1731 				  op_ctx->client->addr.len);
1732 	
1733 		/** @todo: glfs_h_creat doesn't honour NO_CREATE mode. Instead use
1734 		 *  glfs_h_open to verify if the file already exists.
1735 		 */
1736 		glhandle =
1737 		    glfs_h_creat(glfs_export->gl_fs->fs, parenthandle->glhandle, name,
1738 				 p_flags, unix_mode, &sb);
1739 	
1740 		if (glhandle == NULL && errno == EEXIST &&
1741 			 createmode == FSAL_UNCHECKED) {
1742 			/* We tried to create O_EXCL to set attributes and failed.
1743 			 * Remove O_EXCL and retry, also remember not to set attributes.
1744 			 * We still try O_CREAT again just in case file disappears out
1745 			 * from under us.
1746 			 *
1747 			 * Note that because we have dropped O_EXCL, later on we will
1748 			 * not assume we created the file, and thus will not set
1749 			 * additional attributes. We don't need to separately track
1750 			 * the condition of not wanting to set attributes.
1751 			 */
1752 			p_flags &= ~O_EXCL;
1753 			glhandle =
1754 			    glfs_h_creat(glfs_export->gl_fs->fs, parenthandle->glhandle,
1755 					 name, p_flags, unix_mode, &sb);
1756 		} else if (!errno) {
1757 			created = true;
1758 		}
1759 	
1760 		/* restore credentials */
1761 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
1762 	
1763 		if (glhandle == NULL) {
1764 			status = gluster2fsal_error(errno);
1765 			goto out;
1766 		}
1767 	
1768 		/* Remember if we were responsible for creating the file.
1769 		 * Note that in an UNCHECKED retry we MIGHT have re-created the
1770 		 * file and won't remember that. Oh well, so in that rare case we
1771 		 * leak a partially created file if we have a subsequent error in here.
1772 		 * Also notify caller to do permission check if we DID NOT create the
1773 		 * file. Note it IS possible in the case of a race between an UNCHECKED
1774 		 * open and an external unlink, we did create the file, but we will
1775 		 * still force a permission check. That permission check might fail
1776 		 * if the file created after the unlink has a mode that doesn't allow
1777 		 * the caller/creator to open the file (on the other hand, one hopes
1778 		 * a non-exclusive open doesn't set a mode that doesn't allow read/write
1779 		 * since the application clearly expects that another process may have
1780 		 * created the file). This failure case really isn't too awful since
1781 		 * it would just look to the caller like someone else had created the
1782 		 * file with a mode that prevented the open this caller was attempting.
1783 		 */
1784 	
1785 		/* Do a permission check if we were not attempting to create. If we
1786 		 * were attempting any sort of create, then the openat call was made
1787 		 * with the caller's credentials active and as such was permission
1788 		 * checked.
1789 		 */
1790 		*caller_perm_check = !created;
1791 	
1792 		/* Since the file is created, remove O_CREAT/O_EXCL flags */
1793 		p_flags &= ~(O_EXCL | O_CREAT);
1794 	
1795 		retval = glfs_h_extract_handle(glhandle, globjhdl, GFAPI_HANDLE_LENGTH);
1796 		if (retval < 0) {
1797 			status = gluster2fsal_error(errno);
1798 			goto direrr;
1799 		}
1800 	
1801 		retval = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
1802 					   GLAPI_UUID_LENGTH);
1803 		if (retval < 0) {
1804 			status = gluster2fsal_error(errno);
1805 			goto direrr;
1806 		}
1807 	
1808 		construct_handle(glfs_export, &sb, glhandle, globjhdl,
1809 				 &myself, vol_uuid);
1810 	
1811 		*new_obj = &myself->handle;
1812 	
1813 		/* If we didn't have a state above, use the global fd. At this point,
1814 		 * since we just created the global fd, no one else can have a
1815 		 * reference to it, and thus we can mamnipulate unlocked which is
1816 		 * handy since we can then call setattr2 which WILL take the lock
1817 		 * without a double locking deadlock.
1818 		 */
1819 		if (my_fd == NULL)
1820 			my_fd = &myself->globalfd;
1821 	
1822 	open:
1823 		/* now open it */
1824 		status = glusterfs_open_my_fd(myself, openflags, p_flags, my_fd);
1825 	
1826 		if (FSAL_IS_ERROR(status))
1827 			goto direrr;
1828 	
1829 		if (created && attrib_set->valid_mask != 0) {
1830 			/* Set attributes using our newly opened file descriptor as the
1831 			 * share_fd if there are any left to set (mode and truncate
1832 			 * have already been handled).
1833 			 *
1834 			 * Note that we only set the attributes if we were responsible
1835 			 * for creating the file and we have attributes to set.
1836 			 */
1837 			status = (*new_obj)->obj_ops->setattr2(*new_obj,
1838 							      false,
1839 							      state,
1840 							      attrib_set);
1841 	
1842 			if (FSAL_IS_ERROR(status))
1843 				goto fileerr;
1844 	
1845 			if (attrs_out != NULL) {
1846 				status = (*new_obj)->obj_ops->getattrs(*new_obj,
1847 								      attrs_out);
1848 				if (FSAL_IS_ERROR(status) &&
1849 				    (attrs_out->request_mask & ATTR_RDATTR_ERR) == 0) {
1850 					/* Get attributes failed and caller expected
1851 					 * to get the attributes. Otherwise continue
1852 					 * with attrs_out indicating ATTR_RDATTR_ERR.
1853 					 */
1854 					goto fileerr;
1855 				}
1856 			}
1857 		} else if (attrs_out != NULL) {
1858 			/* Since we haven't set any attributes other than what was set
1859 			 * on create (if we even created), just use the stat results
1860 			 * we used to create the fsal_obj_handle.
1861 			 */
1862 			posix2fsal_attributes_all(&sb, attrs_out);
1863 		}
1864 	
1865 	
1866 		if (state != NULL) {
1867 			/* Prepare to take the share reservation, but only if we are
1868 			 * called with a valid state (if state is NULL the caller is
1869 			 * a stateless create such as NFS v3 CREATE).
1870 			 */
1871 	
1872 			/* This can block over an I/O operation. */
1873 			PTHREAD_RWLOCK_wrlock(&(*new_obj)->obj_lock);
1874 	
1875 			/* Take the share reservation now by updating the counters. */
1876 			update_share_counters(&myself->share,
1877 					      FSAL_O_CLOSED,
1878 					      openflags);
1879 	
1880 			PTHREAD_RWLOCK_unlock(&(*new_obj)->obj_lock);
1881 		}
1882 	
1883 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1884 	
1885 	
1886 	fileerr:
1887 		/* Avoiding use after freed, make sure close my_fd before
1888 		 * obj_ops->release(), glfs_close is called depends on
1889 		 * FSAL_O_CLOSED flags, it's harmless of closing my_fd twice
1890 		 * in the floowing obj_ops->release().
1891 		 */
1892 		glusterfs_close_my_fd(my_fd);
1893 	
1894 	direrr:
1895 		/* Release the handle we just allocated. */
1896 		if (*new_obj) {
1897 			(*new_obj)->obj_ops->release(*new_obj);
1898 			/* We released handle at this point */
1899 			glhandle = NULL;
1900 			*new_obj = NULL;
1901 		}
1902 	
1903 		/* Delete the file if we actually created it. */
1904 		if (created) {
1905 			SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
1906 					  &op_ctx->creds->caller_gid,
1907 					  op_ctx->creds->caller_glen,
1908 					  op_ctx->creds->caller_garray,
1909 					  op_ctx->client->addr.addr,
1910 					  op_ctx->client->addr.len);
1911 	
1912 			glfs_h_unlink(glfs_export->gl_fs->fs, parenthandle->glhandle,
1913 				      name);
1914 	
1915 			SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
1916 		}
1917 	
1918 	
1919 		if (status.major != ERR_FSAL_NO_ERROR)
1920 			gluster_cleanup_vars(glhandle);
1921 	
1922 	 out:
1923 	#ifdef GLTIMING
1924 		now(&e_time);
1925 		latency_update(&s_time, &e_time, lat_file_open);
1926 	#endif
1927 		return status;
1928 	}
1929 	
1930 	/* reopen2
1931 	 */
1932 	
1933 	static fsal_status_t glusterfs_reopen2(struct fsal_obj_handle *obj_hdl,
1934 					       struct state_t *state,
1935 					       fsal_openflags_t openflags)
1936 	{
1937 		struct glusterfs_fd fd = {0}, *my_fd = &fd, *my_share_fd = NULL;
1938 		struct glusterfs_handle *myself;
1939 		fsal_status_t status = {0, 0};
1940 		int posix_flags = 0;
1941 		fsal_openflags_t old_openflags;
1942 	
1943 		my_share_fd = &container_of(state, struct glusterfs_state_fd,
1944 					    state)->glusterfs_fd;
1945 	
1946 		fsal2posix_openflags(openflags, &posix_flags);
1947 	
1948 		memset(my_fd, 0, sizeof(*my_fd));
1949 	
1950 		myself  = container_of(obj_hdl,
1951 				       struct glusterfs_handle,
1952 				       handle);
1953 	
1954 	#if 0
1955 		/** @todo: fsid work */
1956 		if (obj_hdl->fsal != obj_hdl->fs->fsal) {
1957 			LogDebug(COMPONENT_FSAL,
1958 				 "FSAL %s operation for handle belonging to FSAL %s, return EXDEV",
1959 				 obj_hdl->fsal->name, obj_hdl->fs->fsal->name);
1960 			return fsalstat(posix2fsal_error(EXDEV), EXDEV);
1961 		}
1962 	#endif
1963 	
1964 		/* This can block over an I/O operation. */
1965 		PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1966 	
1967 		old_openflags = my_share_fd->openflags;
1968 	
1969 		/* We can conflict with old share, so go ahead and check now. */
1970 		status = check_share_conflict(&myself->share, openflags, false);
1971 	
1972 		if (FSAL_IS_ERROR(status)) {
1973 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1974 			return status;
1975 		}
1976 	
1977 		/* Set up the new share so we can drop the lock and not have a
1978 		 * conflicting share be asserted, updating the share counters.
1979 		 */
1980 		update_share_counters(&myself->share, old_openflags, openflags);
1981 	
1982 		PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1983 	
1984 		status = glusterfs_open_my_fd(myself, openflags, posix_flags, my_fd);
1985 	
1986 		if (!FSAL_IS_ERROR(status)) {
1987 			/* Close the existing file descriptor and copy the new
1988 			 * one over. Make sure no one is using the fd that we are
1989 			 * about to close!
1990 			 */
1991 			PTHREAD_RWLOCK_wrlock(&my_share_fd->fdlock);
1992 	
1993 			glusterfs_close_my_fd(my_share_fd);
1994 			my_share_fd->glfd = my_fd->glfd;
1995 			my_share_fd->openflags = my_fd->openflags;
1996 			my_share_fd->creds.caller_uid = my_fd->creds.caller_uid;
1997 			my_share_fd->creds.caller_gid = my_fd->creds.caller_gid;
1998 			my_share_fd->creds.caller_glen = my_fd->creds.caller_glen;
1999 			my_share_fd->creds.caller_garray = my_fd->creds.caller_garray;
2000 	#ifdef USE_GLUSTER_DELEGATION
2001 			memcpy(my_share_fd->lease_id, my_fd->lease_id,
2002 			       GLAPI_LEASE_ID_SIZE);
2003 	#endif
2004 	
2005 			PTHREAD_RWLOCK_unlock(&my_share_fd->fdlock);
2006 		} else {
2007 			/* We had a failure on open - we need to revert the share.
2008 			 * This can block over an I/O operation.
2009 			 */
2010 			PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
2011 	
2012 			update_share_counters(&myself->share,
2013 					      openflags,
2014 					      old_openflags);
2015 	
2016 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2017 		}
2018 	
2019 		return status;
2020 	}
2021 	
2022 	/* read2
2023 	 */
2024 	
2025 	static void glusterfs_read2(struct fsal_obj_handle *obj_hdl,
2026 				    bool bypass,
2027 				    fsal_async_cb done_cb,
2028 				    struct fsal_io_arg *read_arg,
2029 				    void *caller_arg)
2030 	{
2031 		struct glusterfs_fd my_fd = {0};
2032 		ssize_t nb_read;
2033 		fsal_status_t status;
2034 		int retval = 0;
2035 		bool has_lock = false;
2036 		bool closefd = false;
2037 		struct glusterfs_export *glfs_export =
2038 		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
2039 		struct glusterfs_fd *glusterfs_fd = NULL;
2040 		size_t total_size = 0;
2041 		uint64_t seek_descriptor = read_arg->offset;
2042 		int i;
2043 	
2044 		if (read_arg->info != NULL) {
2045 			/* Currently we don't support READ_PLUS */
2046 			done_cb(obj_hdl, fsalstat(ERR_FSAL_NOTSUPP, 0), read_arg,
2047 				caller_arg);
2048 			return;
2049 		}
2050 	
2051 	#if 0
2052 		/** @todo: fsid work */
2053 		if (obj_hdl->fsal != obj_hdl->fs->fsal) {
2054 			LogDebug(COMPONENT_FSAL,
2055 				 "FSAL %s operation for handle belonging to FSAL %s, return EXDEV",
2056 				 obj_hdl->fsal->name, obj_hdl->fs->fsal->name);
2057 			return fsalstat(posix2fsal_error(EXDEV), EXDEV);
2058 		}
2059 	#endif
2060 	
2061 		/* Acquire state's fdlock to prevent OPEN upgrade closing the
2062 		 * file descriptor while we use it.
2063 		 */
2064 		if (read_arg->state) {
2065 			glusterfs_fd = &container_of(read_arg->state,
2066 						     struct glusterfs_state_fd,
2067 						     state)->glusterfs_fd;
2068 	
2069 			PTHREAD_RWLOCK_rdlock(&glusterfs_fd->fdlock);
2070 		}
2071 	
2072 		/* Get a usable file descriptor */
2073 		status = find_fd(&my_fd, obj_hdl, bypass, read_arg->state, FSAL_O_READ,
2074 				 &has_lock, &closefd, false);
2075 	
2076 		if (FSAL_IS_ERROR(status))
2077 			goto out;
2078 	
2079 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
2080 				  &op_ctx->creds->caller_gid,
2081 				  op_ctx->creds->caller_glen,
2082 				  op_ctx->creds->caller_garray,
2083 				  op_ctx->client->addr.addr,
2084 				  op_ctx->client->addr.len);
2085 	
2086 		/* XXX dang switch to preadv_async once async supported */
2087 		nb_read = glfs_preadv(my_fd.glfd, read_arg->iov, read_arg->iov_count,
2088 				      seek_descriptor, 0);
2089 	
2090 		/* restore credentials */
2091 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2092 	
2093 		if (seek_descriptor == -1 || nb_read == -1) {
2094 			retval = errno;
2095 			status = fsalstat(posix2fsal_error(retval), retval);
2096 			goto out;
2097 		}
2098 	
2099 		read_arg->io_amount = nb_read;
2100 	
2101 		for (i = 0; i < read_arg->iov_count; i++) {
2102 			total_size += read_arg->iov[i].iov_len;
2103 		}
2104 	
2105 		if (nb_read < total_size)
2106 			read_arg->end_of_file = true;
2107 	#if 0
2108 		/** @todo
2109 		 *
2110 		 * Is this all we really need to do to support READ_PLUS? Will anyone
2111 		 * ever get upset that we don't return holes, even for blocks of all
2112 		 * zeroes?
2113 		 *
2114 		 */
2115 		if (info != NULL) {
2116 			info->io_content.what = NFS4_CONTENT_DATA;
2117 			info->io_content.data.d_offset = offset + nb_read;
2118 			info->io_content.data.d_data.data_len = nb_read;
2119 			info->io_content.data.d_data.data_val = buffer;
2120 		}
2121 	#endif
2122 	
2123 	 out:
2124 	
2125 		if (glusterfs_fd)
2126 			PTHREAD_RWLOCK_unlock(&glusterfs_fd->fdlock);
2127 	
2128 		if (closefd)
2129 			glusterfs_close_my_fd(&my_fd);
2130 	
2131 		if (has_lock)
2132 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2133 	
2134 		done_cb(obj_hdl, status, read_arg, caller_arg);
2135 	}
2136 	
2137 	/* write2
2138 	 */
2139 	
2140 	static void glusterfs_write2(struct fsal_obj_handle *obj_hdl,
2141 				     bool bypass,
2142 				     fsal_async_cb done_cb,
2143 				     struct fsal_io_arg *write_arg,
2144 				     void *caller_arg)
2145 	{
2146 		ssize_t nb_written;
2147 		fsal_status_t status;
2148 		int retval = 0;
2149 		struct glusterfs_fd my_fd = {0};
2150 		bool has_lock = false;
2151 		bool closefd = false;
2152 		fsal_openflags_t openflags = FSAL_O_WRITE;
2153 		struct glusterfs_export *glfs_export =
2154 		     container_of(op_ctx->fsal_export, struct glusterfs_export, export);
2155 		struct glusterfs_fd *glusterfs_fd = NULL;
2156 	
2157 	#if 0
2158 		/** @todo: fsid work */
2159 		if (obj_hdl->fsal != obj_hdl->fs->fsal) {
2160 			LogDebug(COMPONENT_FSAL,
2161 				 "FSAL %s operation for handle belonging to FSAL %s, return EXDEV",
2162 				 obj_hdl->fsal->name, obj_hdl->fs->fsal->name);
2163 			return fsalstat(posix2fsal_error(EXDEV), EXDEV);
2164 		}
2165 	#endif
2166 	
2167 		/* Acquire state's fdlock to prevent OPEN upgrade closing the
2168 		 * file descriptor while we use it.
2169 		 */
2170 		if (write_arg->state) {
2171 			glusterfs_fd = &container_of(write_arg->state, struct
2172 						     glusterfs_state_fd,
2173 						     state)->glusterfs_fd;
2174 	
2175 			PTHREAD_RWLOCK_rdlock(&glusterfs_fd->fdlock);
2176 		}
2177 	
2178 		/* Get a usable file descriptor */
2179 		status = find_fd(&my_fd, obj_hdl, bypass, write_arg->state, openflags,
2180 				 &has_lock, &closefd, false);
2181 	
2182 		if (FSAL_IS_ERROR(status))
2183 			goto out;
2184 	
2185 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
2186 				  &op_ctx->creds->caller_gid,
2187 				  op_ctx->creds->caller_glen,
2188 				  op_ctx->creds->caller_garray,
2189 				  op_ctx->client->addr.addr,
2190 				  op_ctx->client->addr.len);
2191 	
2192 		/* XXX dang switch to pwritev_async once async supported */
2193 		nb_written = glfs_pwritev(my_fd.glfd, write_arg->iov,
2194 					  write_arg->iov_count, write_arg->offset,
2195 					  (write_arg->fsal_stable ? O_SYNC : 0));
2196 	
2197 		/* restore credentials */
2198 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2199 	
2200 		if (nb_written == -1) {
2201 			retval = errno;
2202 			status = fsalstat(posix2fsal_error(retval), retval);
2203 			goto out;
2204 		}
2205 	
2206 		write_arg->io_amount = nb_written;
2207 	
2208 	 out:
2209 	
2210 		if (glusterfs_fd)
2211 			PTHREAD_RWLOCK_unlock(&glusterfs_fd->fdlock);
2212 	
2213 		if (closefd)
2214 			glusterfs_close_my_fd(&my_fd);
2215 	
2216 		if (has_lock)
2217 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2218 	
2219 		done_cb(obj_hdl, status, write_arg, caller_arg);
2220 	}
2221 	
2222 	/**
2223 	 * @brief Implements GLUSTER FSAL objectoperation seek (DATA/HOLE)
2224 	 * seek2
2225 	 */
2226 	static fsal_status_t seek2(struct fsal_obj_handle *obj_hdl,
2227 					struct state_t *state,
2228 					struct io_info *info)
2229 	{
2230 		off_t ret = 0, offset = info->io_content.hole.di_offset;
2231 		int what = 0;
2232 		bool has_lock = false;
2233 		bool closefd = false;
2234 		fsal_openflags_t openflags = FSAL_O_ANY;
2235 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
2236 		struct glusterfs_fd my_fd = {0};
2237 		struct stat sbuf = {0};
2238 		struct glusterfs_export *glfs_export =
2239 		   container_of(op_ctx->fsal_export, struct glusterfs_export, export);
2240 	
2241 	#ifdef GLTIMING
2242 		struct timespec s_time, e_time;
2243 	
2244 		now(&s_time);
2245 	#endif
2246 	
2247 		/* Get a usable file descriptor */
2248 		status = find_fd(&my_fd, obj_hdl, false, state, openflags,
2249 			&has_lock, &closefd, false);
2250 	
2251 		if (FSAL_IS_ERROR(status))
2252 			goto out;
2253 	
2254 		ret = glfs_fstat(my_fd.glfd, &sbuf);
2255 		if (ret != 0) {
2256 			if (errno == EBADF)
2257 				errno = ESTALE;
2258 			status = gluster2fsal_error(errno);
2259 			goto out;
2260 		}
2261 	
2262 		/* RFC7862 15.11.3,
2263 		 * If the sa_offset is beyond the end of the file,
2264 		 * then SEEK MUST return NFS4ERR_NXIO. */
2265 		if (offset >= sbuf.st_size) {
2266 			status = gluster2fsal_error(ENXIO);
2267 			goto out;
2268 		}
2269 	
2270 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
2271 				&op_ctx->creds->caller_gid,
2272 				op_ctx->creds->caller_glen,
2273 				op_ctx->creds->caller_garray,
2274 				op_ctx->client->addr.addr,
2275 				op_ctx->client->addr.len);
2276 	
2277 		if (info->io_content.what == NFS4_CONTENT_DATA) {
2278 			what = SEEK_DATA;
2279 		} else if (info->io_content.what == NFS4_CONTENT_HOLE) {
2280 			what = SEEK_HOLE;
2281 		} else {
2282 			status = fsalstat(ERR_FSAL_UNION_NOTSUPP, 0);
2283 			goto out;
2284 		}
2285 	
2286 		ret = glfs_lseek(my_fd.glfd, offset, what);
2287 	
2288 		 /* restore credentials */
2289 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2290 	
2291 		if (ret < 0) {
2292 			if (errno == ENXIO) {
2293 				info->io_eof = TRUE;
2294 			} else {
2295 				status = gluster2fsal_error(errno);
2296 			}
2297 			goto out;
2298 		} else {
2299 			info->io_eof = (ret >= sbuf.st_size) ? TRUE : FALSE;
2300 			info->io_content.hole.di_offset = ret;
2301 		}
2302 	
2303 	 out:
2304 	#ifdef GLTIMING
2305 		now(&e_time);
2306 		latency_update(&s_time, &e_time, lat_file_seek);
2307 	#endif
2308 	
2309 		if (closefd)
2310 			glusterfs_close_my_fd(&my_fd);
2311 	
2312 		if (has_lock)
2313 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2314 	
2315 		return status;
2316 	}
2317 	
2318 	
2319 	/* commit2
2320 	 */
2321 	
2322 	static fsal_status_t glusterfs_commit2(struct fsal_obj_handle *obj_hdl,
2323 					       off_t offset,
2324 					       size_t len)
2325 	{
2326 		fsal_status_t status;
2327 		int retval;
2328 		struct glusterfs_fd tmp_fd = {
2329 				FSAL_O_CLOSED, PTHREAD_RWLOCK_INITIALIZER, NULL };
2330 		struct glusterfs_fd *out_fd = &tmp_fd;
2331 		struct glusterfs_handle *myself = NULL;
2332 		bool has_lock = false;
2333 		bool closefd = false;
2334 		struct glusterfs_export *glfs_export =
2335 		    container_of(op_ctx->fsal_export,
2336 				 struct glusterfs_export, export);
2337 	
2338 		myself = container_of(obj_hdl, struct glusterfs_handle, handle);
2339 	
2340 		/* Make sure file is open in appropriate mode.
2341 		 * Do not check share reservation.
2342 		 */
2343 		status = fsal_reopen_obj(obj_hdl, false, false, FSAL_O_WRITE,
2344 					 (struct fsal_fd *)&myself->globalfd,
2345 					 &myself->share, glusterfs_open_func,
2346 					 glusterfs_close_func,
2347 					 (struct fsal_fd **)&out_fd,
2348 					 &has_lock, &closefd);
2349 	
2350 		if (!FSAL_IS_ERROR(status)) {
2351 	
2352 			SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
2353 					  &op_ctx->creds->caller_gid,
2354 					  op_ctx->creds->caller_glen,
2355 					  op_ctx->creds->caller_garray,
2356 					  op_ctx->client->addr.addr,
2357 					  op_ctx->client->addr.len);
2358 	#ifdef USE_GLUSTER_STAT_FETCH_API
2359 			retval = glfs_fsync(out_fd->glfd, NULL, NULL);
2360 	#else
2361 			retval = glfs_fsync(out_fd->glfd);
2362 	#endif
2363 	
2364 			if (retval == -1) {
2365 				retval = errno;
2366 				status = fsalstat(posix2fsal_error(retval), retval);
2367 			}
2368 	
2369 			/* restore credentials */
2370 			SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2371 		}
2372 	
2373 		if (closefd)
2374 			glusterfs_close_my_fd(out_fd);
2375 	
2376 		if (has_lock)
2377 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2378 	
2379 		return status;
2380 	}
2381 	
2382 	/* lock_op2
2383 	 */
2384 	
2385 	static fsal_status_t glusterfs_lock_op2(struct fsal_obj_handle *obj_hdl,
2386 						struct state_t *state,
2387 						void *p_owner,
2388 						fsal_lock_op_t lock_op,
2389 						fsal_lock_param_t *request_lock,
2390 						fsal_lock_param_t *conflicting_lock)
2391 	{
2392 		struct flock lock_args;
2393 		int fcntl_comm;
2394 		fsal_status_t status = {0, 0};
2395 		int retval = 0;
2396 		struct glusterfs_fd my_fd = {0};
2397 		bool has_lock = false;
2398 		bool closefd = false;
2399 		bool bypass = false;
2400 		fsal_openflags_t openflags = FSAL_O_RDWR;
2401 		struct glusterfs_export *glfs_export =
2402 		    container_of(op_ctx->fsal_export,
2403 				 struct glusterfs_export, export);
2404 		struct glusterfs_fd *glusterfs_fd = NULL;
2405 	
2406 	#if 0
2407 		/** @todo: fsid work */
2408 		if (obj_hdl->fsal != obj_hdl->fs->fsal) {
2409 			LogDebug(COMPONENT_FSAL,
2410 				 "FSAL %s operation for handle belonging to FSAL %s, return EXDEV",
2411 				 obj_hdl->fsal->name, obj_hdl->fs->fsal->name);
2412 			return fsalstat(posix2fsal_error(EXDEV), EXDEV);
2413 		}
2414 	#endif
2415 	
2416 		LogFullDebug(COMPONENT_FSAL,
2417 			     "Locking: op(%d) type(%d) start(%" PRIu64
2418 			     ") length(%" PRIu64 ")",
2419 			     lock_op, request_lock->lock_type, request_lock->lock_start,
2420 			     request_lock->lock_length);
2421 	
2422 		if (lock_op == FSAL_OP_LOCKT) {
2423 			/* We may end up using global fd, don't fail on a deny mode */
2424 			bypass = true;
2425 			fcntl_comm = F_GETLK;
2426 			openflags = FSAL_O_ANY;
2427 		} else if (lock_op == FSAL_OP_LOCK) {
2428 			fcntl_comm = F_SETLK;
2429 	
2430 			if (request_lock->lock_type == FSAL_LOCK_R)
2431 				openflags = FSAL_O_READ;
2432 			else if (request_lock->lock_type == FSAL_LOCK_W)
2433 				openflags = FSAL_O_WRITE;
2434 		} else if (lock_op == FSAL_OP_UNLOCK) {
2435 			fcntl_comm = F_SETLK;
2436 			openflags = FSAL_O_ANY;
2437 		} else {
2438 			LogDebug(COMPONENT_FSAL,
2439 				 "ERROR: Lock operation requested was not TEST, READ, or WRITE.");
2440 			return fsalstat(ERR_FSAL_NOTSUPP, 0);
2441 		}
2442 	
2443 		if (lock_op != FSAL_OP_LOCKT && state == NULL) {
2444 			LogCrit(COMPONENT_FSAL, "Non TEST operation with NULL state");
2445 			return fsalstat(posix2fsal_error(EINVAL), EINVAL);
2446 		}
2447 	
2448 		if (request_lock->lock_type == FSAL_LOCK_R) {
2449 			lock_args.l_type = F_RDLCK;
2450 		} else if (request_lock->lock_type == FSAL_LOCK_W) {
2451 			lock_args.l_type = F_WRLCK;
2452 		} else {
2453 			LogDebug(COMPONENT_FSAL,
2454 				 "ERROR: The requested lock type was not read or write.");
2455 			return fsalstat(ERR_FSAL_NOTSUPP, 0);
2456 		}
2457 	
2458 		if (lock_op == FSAL_OP_UNLOCK)
2459 			lock_args.l_type = F_UNLCK;
2460 	
2461 		lock_args.l_pid = 0;
2462 		lock_args.l_len = request_lock->lock_length;
2463 		lock_args.l_start = request_lock->lock_start;
2464 		lock_args.l_whence = SEEK_SET;
2465 	
2466 		/* flock.l_len being signed long integer, larger lock ranges may
2467 		 * get mapped to negative values. As per 'man 3 fcntl', posix
2468 		 * locks can accept negative l_len values which may lead to
2469 		 * unlocking an unintended range. Better bail out to prevent that.
2470 		 */
2471 		if (lock_args.l_len < 0) {
2472 			LogCrit(COMPONENT_FSAL,
2473 				"The requested lock length is out of range: lock_args.l_len(%"
2474 				PRId64 "), request_lock_length(%" PRIu64 ")",
2475 				lock_args.l_len, request_lock->lock_length);
2476 			return fsalstat(ERR_FSAL_BAD_RANGE, 0);
2477 		}
2478 	
2479 		/* Acquire state's fdlock to prevent OPEN upgrade closing the
2480 		 * file descriptor while we use it.
2481 		 */
2482 		if (state) {
2483 			glusterfs_fd = &container_of(state, struct glusterfs_state_fd,
2484 						     state)->glusterfs_fd;
2485 	
2486 			PTHREAD_RWLOCK_rdlock(&glusterfs_fd->fdlock);
2487 		}
2488 	
2489 		/* Get a usable file descriptor */
2490 		status = find_fd(&my_fd, obj_hdl, bypass, state, openflags,
2491 				 &has_lock, &closefd,
2492 				 (state &&
2493 				  ((state->state_type == STATE_TYPE_NLM_LOCK) ||
2494 				  (state->state_type == STATE_TYPE_9P_FID))));
2495 	
2496 		if (FSAL_IS_ERROR(status)) {
2497 			LogCrit(COMPONENT_FSAL, "Unable to find fd for lock operation");
2498 			goto err;
2499 		}
2500 	
2501 		errno = 0;
2502 		SET_GLUSTER_CREDS(glfs_export, &my_fd.creds.caller_uid,
2503 				  &my_fd.creds.caller_gid,
2504 				  my_fd.creds.caller_glen,
2505 				  my_fd.creds.caller_garray,
2506 	#ifdef USE_GLUSTER_DELEGATION
2507 				  my_fd.lease_id,
2508 				  GLAPI_LEASE_ID_SIZE);
2509 	#else
2510 				  NULL, 0);
2511 	#endif
2512 	
2513 		/* Convert lkowner ptr address to opaque string */
(1) Event suspicious_sizeof: Passing argument "p_owner" of type "void *" and argument "8 /* sizeof (p_owner) */" to function "glfs_fd_set_lkowner" is suspicious.
2514 		retval = glfs_fd_set_lkowner(my_fd.glfd, p_owner, sizeof(p_owner));
2515 		if (retval) {
2516 			LogCrit(COMPONENT_FSAL,
2517 				"Setting lkowner failed");
2518 			goto err;
2519 		}
2520 	
2521 		retval = glfs_posix_lock(my_fd.glfd, fcntl_comm, &lock_args);
2522 	
2523 		if (retval /* && lock_op == FSAL_OP_LOCK */) {
2524 			retval = errno;
2525 			int rc = 0;
2526 	
2527 			LogDebug(COMPONENT_FSAL,
2528 				 "fcntl returned %d %s",
2529 				 retval, strerror(retval));
2530 	
2531 			if (conflicting_lock != NULL) {
2532 				/* Get the conflicting lock */
2533 	
2534 				rc = glfs_fd_set_lkowner(my_fd.glfd, p_owner,
2535 							 sizeof(p_owner));
2536 				if (rc) {
2537 					retval = errno; /* we losethe initial error */
2538 					LogCrit(COMPONENT_FSAL,
2539 						"Setting lkowner while trying to get conflicting lock failed");
2540 					goto err;
2541 				}
2542 	
2543 				rc = glfs_posix_lock(my_fd.glfd, F_GETLK,
2544 							 &lock_args);
2545 	
2546 				if (rc) {
2547 					retval = errno; /* we lose the initial error */
2548 					LogCrit(COMPONENT_FSAL,
2549 						"After failing a lock request, I couldn't even get the details of who owns the lock.");
2550 					goto err;
2551 				}
2552 	
2553 				conflicting_lock->lock_length = lock_args.l_len;
2554 				conflicting_lock->lock_start = lock_args.l_start;
2555 				conflicting_lock->lock_type = lock_args.l_type;
2556 			}
2557 	
2558 			goto err;
2559 		}
2560 	
2561 		/* F_UNLCK is returned then the tested operation would be possible. */
2562 		if (conflicting_lock != NULL) {
2563 			if (lock_op == FSAL_OP_LOCKT && lock_args.l_type != F_UNLCK) {
2564 				conflicting_lock->lock_length = lock_args.l_len;
2565 				conflicting_lock->lock_start = lock_args.l_start;
2566 				conflicting_lock->lock_type = lock_args.l_type;
2567 			} else {
2568 				conflicting_lock->lock_length = 0;
2569 				conflicting_lock->lock_start = 0;
2570 				conflicting_lock->lock_type = FSAL_NO_LOCK;
2571 			}
2572 		}
2573 	
2574 	
2575 		/* Fall through (retval == 0) */
2576 	
2577 	 err:
2578 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2579 	
2580 		if (glusterfs_fd)
2581 			PTHREAD_RWLOCK_unlock(&glusterfs_fd->fdlock);
2582 	
2583 		if (closefd)
2584 			glusterfs_close_my_fd(&my_fd);
2585 	
2586 		if (has_lock)
2587 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2588 	
2589 		if (retval)
2590 			status = gluster2fsal_error(retval);
2591 	
2592 		return status;
2593 	}
2594 	
2595 	#ifdef USE_GLUSTER_DELEGATION
2596 	static fsal_status_t glusterfs_lease_op2(struct fsal_obj_handle *obj_hdl,
2597 						 struct state_t *state,
2598 						 void *owner,
2599 						 fsal_deleg_t deleg)
2600 	{
2601 		int retval = 0;
2602 		fsal_status_t status = {ERR_FSAL_NO_ERROR, 0};
2603 		struct glusterfs_fd my_fd = {0};
2604 		bool has_lock = false;
2605 		bool closefd = false;
2606 		bool bypass = false;
2607 		fsal_openflags_t openflags = FSAL_O_RDWR;
2608 		struct glusterfs_handle *myself = NULL;
2609 		struct glusterfs_fd *glusterfs_fd = NULL;
2610 		struct glfs_lease lease = {0,};
2611 		struct glusterfs_export *glfs_export =
2612 		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
2613 	
2614 		myself = container_of(obj_hdl, struct glusterfs_handle, handle);
2615 	
2616 		switch (deleg) {
2617 		case FSAL_DELEG_NONE:
2618 			lease.cmd = GLFS_UNLK_LEASE;
2619 			openflags = FSAL_O_ANY;
2620 			/* 'myself' should contain the lease_type obtained.
2621 			 * If not, we had already unlocked the lease and this is
2622 			 * duplicate request. Return as noop. */
2623 			if (myself->lease_type == 0) {
2624 				LogDebug(COMPONENT_FSAL,
2625 					"No lease found to unlock");
2626 				return status;
2627 			}
2628 			lease.lease_type = myself->lease_type;
2629 			break;
2630 		case FSAL_DELEG_RD:
2631 			lease.cmd = GLFS_SET_LEASE;
2632 			openflags = FSAL_O_READ;
2633 			lease.lease_type = GLFS_RD_LEASE;
2634 			break;
2635 		case FSAL_DELEG_WR:
2636 			lease.cmd = GLFS_SET_LEASE;
2637 			openflags = FSAL_O_WRITE;
2638 			lease.lease_type = GLFS_RW_LEASE;
2639 			break;
2640 		default:
2641 			LogCrit(COMPONENT_FSAL, "Unknown requested lease state");
2642 			return gluster2fsal_error(EINVAL);
2643 		}
2644 	
2645 		/* Acquire state's fdlock to prevent OPEN upgrade closing the
2646 		 * file descriptor while we use it.
2647 		 */
2648 		if (state) {
2649 			glusterfs_fd = &container_of(state, struct glusterfs_state_fd,
2650 						     state)->glusterfs_fd;
2651 	
2652 			PTHREAD_RWLOCK_rdlock(&glusterfs_fd->fdlock);
2653 		}
2654 	
2655 		/* Get a usable file descriptor */
2656 		status = find_fd(&my_fd, obj_hdl, bypass, state, openflags,
2657 				 &has_lock, &closefd, true);
2658 	
2659 		if (FSAL_IS_ERROR(status)) {
2660 			LogCrit(COMPONENT_FSAL,
2661 				"Unable to find fd for lease operation");
2662 			goto err;
2663 		}
2664 	
2665 		/* Since we open unique fd for each NFSv4.x OPEN
2666 		 * operation, we should have had lease_id set
2667 		 */
2668 		memcpy(lease.lease_id, my_fd.lease_id, GLAPI_LEASE_ID_SIZE);
2669 	
2670 		errno = 0;
2671 		SET_GLUSTER_CREDS(glfs_export, &my_fd.creds.caller_uid,
2672 				  &my_fd.creds.caller_gid,
2673 				  my_fd.creds.caller_glen,
2674 				  my_fd.creds.caller_garray,
2675 				  my_fd.lease_id,
2676 				  GLAPI_LEASE_ID_SIZE);
2677 		retval = glfs_lease(my_fd.glfd, &lease, NULL, NULL);
2678 	
2679 		if (retval) {
2680 			retval = errno;
2681 			LogWarn(COMPONENT_FSAL, "Unable to %s lease",
2682 				(deleg == FSAL_DELEG_NONE) ?
2683 				 "release" : "acquire");
2684 		} else {
2685 			if (deleg == FSAL_DELEG_NONE) { /* reset lease_type */
2686 				myself->lease_type = 0;
2687 			} else {
2688 				myself->lease_type = lease.lease_type;
2689 			}
2690 		}
2691 	
2692 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2693 	
2694 	err:
2695 		if (closefd)
2696 			glusterfs_close_my_fd(&my_fd);
2697 	
2698 		if (glusterfs_fd)
2699 			PTHREAD_RWLOCK_unlock(&glusterfs_fd->fdlock);
2700 	
2701 		if (has_lock)
2702 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2703 	
2704 		if (retval)
2705 			status = gluster2fsal_error(retval);
2706 	
2707 		return status;
2708 	}
2709 	#endif
2710 	
2711 	/**
2712 	 * @brief Set attributes on an object
2713 	 *
2714 	 * This function sets attributes on an object.  Which attributes are
2715 	 * set is determined by attrib_set->valid_mask. The FSAL must manage bypass
2716 	 * or not of share reservations, and a state may be passed.
2717 	 *
2718 	 * @param[in] obj_hdl    File on which to operate
2719 	 * @param[in] state      state_t to use for this operation
2720 	 * @param[in] attrib_set Attributes to set
2721 	 *
2722 	 * @return FSAL status.
2723 	 */
2724 	
2725 	static fsal_status_t glusterfs_setattr2(struct fsal_obj_handle *obj_hdl,
2726 						bool bypass,
2727 						struct state_t *state,
2728 						struct attrlist *attrib_set)
2729 	{
2730 		struct glusterfs_handle *myself;
2731 		fsal_status_t status = {0, 0};
2732 		int retval = 0;
2733 		fsal_openflags_t openflags = FSAL_O_ANY;
2734 		bool has_lock = false;
2735 		bool closefd = false;
2736 		struct glusterfs_fd my_fd = {0};
2737 		struct glusterfs_export *glfs_export =
2738 		    container_of(op_ctx->fsal_export, struct glusterfs_export, export);
2739 		glusterfs_fsal_xstat_t buffxstat = { 0 };
2740 		int attr_valid = 0;
2741 		int mask = 0;
2742 		struct glusterfs_fd *glusterfs_fd = NULL;
2743 	
2744 		/** @todo: Handle special file symblic links etc */
2745 		/* apply umask, if mode attribute is to be changed */
2746 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_MODE))
2747 			attrib_set->mode &=
2748 			    ~op_ctx->fsal_export->exp_ops.fs_umask(op_ctx->fsal_export);
2749 	
2750 		myself = container_of(obj_hdl, struct glusterfs_handle, handle);
2751 	
2752 	#if 0
2753 		/** @todo: fsid work */
2754 		if (obj_hdl->fsal != obj_hdl->fs->fsal) {
2755 			LogDebug(COMPONENT_FSAL,
2756 				 "FSAL %s operation for handle belonging to FSAL %s, return EXDEV",
2757 				 obj_hdl->fsal->name, obj_hdl->fs->fsal->name);
2758 			return fsalstat(posix2fsal_error(EXDEV), EXDEV);
2759 		}
2760 	#endif
2761 	
2762 		/* Test if size is being set, make sure file is regular and if so,
2763 		 * require a read/write file descriptor.
2764 		 */
2765 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_SIZE)) {
2766 			if (obj_hdl->type != REGULAR_FILE)
2767 				return fsalstat(ERR_FSAL_INVAL, EINVAL);
2768 			openflags = FSAL_O_RDWR;
2769 		}
2770 	
2771 		/** TRUNCATE **/
2772 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_SIZE) &&
2773 		    (obj_hdl->type == REGULAR_FILE)) {
2774 			/* Acquire state's fdlock to prevent OPEN upgrade closing the
2775 			 * file descriptor while we use it.
2776 			 */
2777 			if (state) {
2778 				glusterfs_fd = &container_of(state,
2779 							     struct glusterfs_state_fd,
2780 							     state)->glusterfs_fd;
2781 	
2782 				PTHREAD_RWLOCK_rdlock(&glusterfs_fd->fdlock);
2783 			}
2784 	
2785 			/* Get a usable file descriptor. Share conflict is only
2786 			 * possible if size is being set. For special files,
2787 			 * handle via handle.
2788 			 */
2789 			status = find_fd(&my_fd, obj_hdl, bypass, state, openflags,
2790 					 &has_lock, &closefd, false);
2791 	
2792 			if (FSAL_IS_ERROR(status))
2793 				goto out;
2794 	
2795 			SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
2796 					  &op_ctx->creds->caller_gid,
2797 					  op_ctx->creds->caller_glen,
2798 					  op_ctx->creds->caller_garray,
2799 					  op_ctx->client->addr.addr,
2800 					  op_ctx->client->addr.len);
2801 	#ifdef USE_GLUSTER_STAT_FETCH_API
2802 			retval = glfs_ftruncate(my_fd.glfd, attrib_set->filesize,
2803 						NULL, NULL);
2804 	#else
2805 			retval = glfs_ftruncate(my_fd.glfd, attrib_set->filesize);
2806 	#endif
2807 			SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2808 	
2809 			if (retval != 0) {
2810 				status = gluster2fsal_error(errno);
2811 				goto out;
2812 			}
2813 		}
2814 	
2815 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_MODE)) {
2816 			FSAL_SET_MASK(mask, GLAPI_SET_ATTR_MODE);
2817 			buffxstat.buffstat.st_mode = fsal2unix_mode(attrib_set->mode);
2818 		}
2819 	
2820 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_OWNER)) {
2821 			FSAL_SET_MASK(mask, GLAPI_SET_ATTR_UID);
2822 			buffxstat.buffstat.st_uid = attrib_set->owner;
2823 		}
2824 	
2825 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_GROUP)) {
2826 			FSAL_SET_MASK(mask, GLAPI_SET_ATTR_GID);
2827 			buffxstat.buffstat.st_gid = attrib_set->group;
2828 		}
2829 	
2830 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_ATIME)) {
2831 			FSAL_SET_MASK(mask, GLAPI_SET_ATTR_ATIME);
2832 			buffxstat.buffstat.st_atim = attrib_set->atime;
2833 		}
2834 	
2835 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_ATIME_SERVER)) {
2836 			FSAL_SET_MASK(mask, GLAPI_SET_ATTR_ATIME);
2837 			struct timespec timestamp;
2838 	
2839 			retval = clock_gettime(CLOCK_REALTIME, &timestamp);
2840 			if (retval != 0) {
2841 				status = gluster2fsal_error(errno);
2842 				goto out;
2843 			}
2844 			buffxstat.buffstat.st_atim = timestamp;
2845 		}
2846 	
2847 		/* try to look at glfs_futimens() instead as done in vfs */
2848 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_MTIME)) {
2849 			FSAL_SET_MASK(mask, GLAPI_SET_ATTR_MTIME);
2850 			buffxstat.buffstat.st_mtim = attrib_set->mtime;
2851 		}
2852 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_MTIME_SERVER)) {
2853 			FSAL_SET_MASK(mask, GLAPI_SET_ATTR_MTIME);
2854 			struct timespec timestamp;
2855 	
2856 			retval = clock_gettime(CLOCK_REALTIME, &timestamp);
2857 			if (retval != 0) {
2858 				status = gluster2fsal_error(retval);
2859 				goto out;
2860 			}
2861 			buffxstat.buffstat.st_mtim = timestamp;
2862 		}
2863 	
2864 		/** @todo: Check for attributes not supported and return */
2865 		/* EATTRNOTSUPP error.  */
2866 	
2867 		if (NFSv4_ACL_SUPPORT) {
2868 			if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_ACL) &&
2869 			    attrib_set->acl && attrib_set->acl->naces) {
2870 				if (obj_hdl->type == DIRECTORY)
2871 					buffxstat.is_dir = true;
2872 				else
2873 					buffxstat.is_dir = false;
2874 	
2875 				FSAL_SET_MASK(attr_valid, XATTR_ACL);
2876 				status =
2877 				  glusterfs_process_acl(glfs_export->gl_fs->fs,
2878 							myself->glhandle,
2879 							attrib_set, &buffxstat);
2880 	
2881 				if (FSAL_IS_ERROR(status))
2882 					goto out;
2883 				/* setting the ACL will set the */
2884 				/* mode-bits too if not already passed */
2885 				FSAL_SET_MASK(mask, GLAPI_SET_ATTR_MODE);
2886 			}
2887 		} else if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR_ACL)) {
2888 			status = fsalstat(ERR_FSAL_ATTRNOTSUPP, 0);
2889 			goto out;
2890 		}
2891 	
2892 		SET_GLUSTER_CREDS(glfs_export, &op_ctx->creds->caller_uid,
2893 				  &op_ctx->creds->caller_gid,
2894 				  op_ctx->creds->caller_glen,
2895 				  op_ctx->creds->caller_garray,
2896 				  op_ctx->client->addr.addr,
2897 				  op_ctx->client->addr.len);
2898 	
2899 		/* If any stat changed, indicate that */
2900 		if (mask != 0)
2901 			FSAL_SET_MASK(attr_valid, XATTR_STAT);
2902 		if (FSAL_TEST_MASK(attr_valid, XATTR_STAT)) {
2903 			/* Only if there is any change in attrs send them down to fs */
2904 			/** @todo: instead use glfs_fsetattr().... looks like there is
2905 			 * fix needed in there..it doesn't convert the mask flags
2906 			 * to corresponding gluster flags.
2907 			 */
2908 			retval = glfs_h_setattrs(glfs_export->gl_fs->fs,
2909 					     myself->glhandle,
2910 					     &buffxstat.buffstat,
2911 					     mask);
2912 			if (retval != 0) {
2913 				status = gluster2fsal_error(errno);
2914 				goto creds;
2915 			}
2916 		}
2917 	
2918 		if (FSAL_TEST_MASK(attr_valid, XATTR_ACL))
2919 			status = glusterfs_set_acl(glfs_export, myself, &buffxstat);
2920 	
2921 		if (FSAL_IS_ERROR(status)) {
2922 			LogDebug(COMPONENT_FSAL,
2923 				 "setting ACL failed");
2924 			goto creds;
2925 		}
2926 	
2927 		if (FSAL_TEST_MASK(attrib_set->valid_mask, ATTR4_SEC_LABEL) &&
2928 				op_ctx_export_has_option(EXPORT_OPTION_SECLABEL_SET)) {
2929 			retval = glfs_h_setxattrs(glfs_export->gl_fs->fs,
2930 					myself->glhandle,
2931 					glfs_export->sec_label_xattr,
2932 					attrib_set->sec_label.slai_data.slai_data_val,
2933 					attrib_set->sec_label.slai_data.slai_data_len,
2934 					0);
2935 			if (retval < 0) {
2936 				status = gluster2fsal_error(errno);
2937 				LogCrit(COMPONENT_FSAL,
2938 						"Error : seclabel failed with error %s",
2939 						strerror(errno));
2940 			}
2941 		}
2942 	
2943 	creds:
2944 		SET_GLUSTER_CREDS(glfs_export, NULL, NULL, 0, NULL, NULL, 0);
2945 	
2946 	out:
2947 		if (FSAL_IS_ERROR(status)) {
2948 			LogCrit(COMPONENT_FSAL,
2949 				 "setattrs failed with error %s",
2950 				 strerror(status.minor));
2951 		}
2952 	
2953 		if (glusterfs_fd)
2954 			PTHREAD_RWLOCK_unlock(&glusterfs_fd->fdlock);
2955 	
2956 		if (closefd)
2957 			glusterfs_close_my_fd(&my_fd);
2958 	
2959 		if (has_lock)
2960 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2961 	
2962 		glusterfs_fsal_clean_xstat(&buffxstat);
2963 		return status;
2964 	}
2965 	
2966 	/* close2
2967 	 */
2968 	
2969 	static fsal_status_t glusterfs_close2(struct fsal_obj_handle *obj_hdl,
2970 					      struct state_t *state)
2971 	{
2972 		struct glusterfs_handle *myself = NULL;
2973 		fsal_status_t status = {0, 0};
2974 		struct glusterfs_fd *my_fd = &container_of(state,
2975 							   struct glusterfs_state_fd,
2976 							   state)->glusterfs_fd;
2977 	
2978 		myself = container_of(obj_hdl,
2979 				      struct glusterfs_handle,
2980 				      handle);
2981 	
2982 		if (state->state_type == STATE_TYPE_SHARE ||
2983 		    state->state_type == STATE_TYPE_NLM_SHARE ||
2984 		    state->state_type == STATE_TYPE_9P_FID) {
2985 			/* This is a share state, we must update the share counters */
2986 	
2987 			/* This can block over an I/O operation. */
2988 			PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
2989 	
2990 			update_share_counters(&myself->share,
2991 					      my_fd->openflags,
2992 					      FSAL_O_CLOSED);
2993 	
2994 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2995 		}
2996 	
2997 		/* Acquire state's fdlock to make sure no other thread
2998 		 * is operating on the fd while we close it.
2999 		 */
3000 		PTHREAD_RWLOCK_wrlock(&my_fd->fdlock);
3001 		status = glusterfs_close_my_fd(my_fd);
3002 		PTHREAD_RWLOCK_unlock(&my_fd->fdlock);
3003 	
3004 		return status;
3005 	}
3006 	
3007 	/**
3008 	 * @brief Implements GLUSTER FSAL objectoperation list_ext_attrs
3009 	 */
3010 	/*
3011 	static fsal_status_t list_ext_attrs(struct fsal_obj_handle *obj_hdl,
3012 					    const struct req_op_context *opctx,
3013 					    unsigned int cookie,
3014 					    fsal_xattrent_t * xattrs_tab,
3015 					    unsigned int xattrs_tabsize,
3016 					    unsigned int *p_nb_returned,
3017 					    int *end_of_list)
3018 	{
3019 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3020 	}
3021 	*/
3022 	/**
3023 	 * @brief Implements GLUSTER FSAL objectoperation getextattr_id_by_name
3024 	 */
3025 	/*
3026 	static fsal_status_t getextattr_id_by_name(struct fsal_obj_handle *obj_hdl,
3027 						   const struct req_op_context *opctx,
3028 						   const char *xattr_name,
3029 						   unsigned int *pxattr_id)
3030 	{
3031 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3032 	}
3033 	*/
3034 	/**
3035 	 * @brief Implements GLUSTER FSAL objectoperation getextattr_value_by_name
3036 	 */
3037 	/*
3038 	static fsal_status_t getextattr_value_by_name(struct fsal_obj_handle *obj_hdl,
3039 						      const struct
3040 						      req_op_context *opctx,
3041 						      const char *xattr_name,
3042 						      void *buffer_addr,
3043 						      size_t buffer_size,
3044 						      size_t * p_output_size)
3045 	{
3046 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3047 	}
3048 	*/
3049 	/**
3050 	 * @brief Implements GLUSTER FSAL objectoperation getextattr_value_by_id
3051 	 */
3052 	/*
3053 	static fsal_status_t getextattr_value_by_id(struct fsal_obj_handle *obj_hdl,
3054 						    const struct req_op_context *opctx,
3055 						    unsigned int xattr_id,
3056 						    void *buffer_addr,
3057 						    size_t buffer_size,
3058 						    size_t *p_output_size)
3059 	{
3060 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3061 	}
3062 	*/
3063 	/**
3064 	 * @brief Implements GLUSTER FSAL objectoperation setextattr_value
3065 	 */
3066 	/*
3067 	static fsal_status_t setextattr_value(struct fsal_obj_handle *obj_hdl,
3068 					      const struct req_op_context *opctx,
3069 					      const char *xattr_name,
3070 					      void *buffer_addr,
3071 					      size_t buffer_size,
3072 					      int create)
3073 	{
3074 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3075 	}
3076 	*/
3077 	/**
3078 	 * @brief Implements GLUSTER FSAL objectoperation setextattr_value_by_id
3079 	 */
3080 	/*
3081 	static fsal_status_t setextattr_value_by_id(struct fsal_obj_handle *obj_hdl,
3082 						    const struct req_op_context *opctx,
3083 						    unsigned int xattr_id,
3084 						    void *buffer_addr,
3085 						    size_t buffer_size)
3086 	{
3087 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3088 	}
3089 	*/
3090 	/**
3091 	 * @brief Implements GLUSTER FSAL objectoperation getextattr_attrs
3092 	 */
3093 	/*
3094 	static fsal_status_t getextattr_attrs(struct fsal_obj_handle *obj_hdl,
3095 					      const struct req_op_context *opctx,
3096 					      unsigned int xattr_id,
3097 					      struct attrlist* p_attrs)
3098 	{
3099 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3100 	}
3101 	*/
3102 	/**
3103 	 * @brief Implements GLUSTER FSAL objectoperation remove_extattr_by_id
3104 	 */
3105 	/*
3106 	static fsal_status_t remove_extattr_by_id(struct fsal_obj_handle *obj_hdl,
3107 						  const struct req_op_context *opctx,
3108 						  unsigned int xattr_id)
3109 	{
3110 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3111 	}
3112 	*/
3113 	/**
3114 	 * @brief Implements GLUSTER FSAL objectoperation remove_extattr_by_name
3115 	 */
3116 	/*
3117 	static fsal_status_t remove_extattr_by_name(struct fsal_obj_handle *obj_hdl,
3118 						    const struct req_op_context *opctx,
3119 						    const char *xattr_name)
3120 	{
3121 		return fsalstat(ERR_FSAL_NOTSUPP, 0);
3122 	}
3123 	*/
3124 	
3125 	/**
3126 	 * @brief Implements GLUSTER FSAL objectoperation handle_to_wire
3127 	 */
3128 	
3129 	static fsal_status_t handle_to_wire(const struct fsal_obj_handle *obj_hdl,
3130 					    fsal_digesttype_t output_type,
3131 					    struct gsh_buffdesc *fh_desc)
3132 	{
3133 		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
3134 		size_t fh_size;
3135 		struct glusterfs_handle *objhandle;
3136 	#ifdef GLTIMING
3137 		struct timespec s_time, e_time;
3138 	
3139 		now(&s_time);
3140 	#endif
3141 	
3142 		if (!fh_desc)
3143 			return fsalstat(ERR_FSAL_FAULT, 0);
3144 	
3145 		objhandle = container_of(obj_hdl, struct glusterfs_handle, handle);
3146 	
3147 		switch (output_type) {
3148 		case FSAL_DIGEST_NFSV3:
3149 		case FSAL_DIGEST_NFSV4:
3150 			fh_size = GLAPI_HANDLE_LENGTH;
3151 			if (fh_desc->len < fh_size) {
3152 				LogMajor(COMPONENT_FSAL,
3153 					 "Space too small for handle.  need %zu, have %zu",
3154 					 fh_size, fh_desc->len);
3155 				status.major = ERR_FSAL_TOOSMALL;
3156 				goto out;
3157 			}
3158 			memcpy(fh_desc->addr, objhandle->globjhdl, fh_size);
3159 			break;
3160 		default:
3161 			status.major = ERR_FSAL_SERVERFAULT;
3162 			goto out;
3163 		}
3164 	
3165 		fh_desc->len = fh_size;
3166 	 out:
3167 	#ifdef GLTIMING
3168 		now(&e_time);
3169 		latency_update(&s_time, &e_time, lat_handle_to_wire);
3170 	#endif
3171 		return status;
3172 	}
3173 	
3174 	/**
3175 	 * @brief Implements GLUSTER FSAL objectoperation handle_to_key
3176 	 */
3177 	
3178 	static void handle_to_key(struct fsal_obj_handle *obj_hdl,
3179 				  struct gsh_buffdesc *fh_desc)
3180 	{
3181 		struct glusterfs_handle *objhandle;
3182 	#ifdef GLTIMING
3183 		struct timespec s_time, e_time;
3184 	
3185 		now(&s_time);
3186 	#endif
3187 	
3188 		objhandle = container_of(obj_hdl, struct glusterfs_handle, handle);
3189 		fh_desc->addr = objhandle->globjhdl;
3190 		fh_desc->len = GLAPI_HANDLE_LENGTH;
3191 	
3192 	#ifdef GLTIMING
3193 		now(&e_time);
3194 		latency_update(&s_time, &e_time, lat_handle_to_key);
3195 	#endif
3196 	}
3197 	
3198 	/**
3199 	 * @brief Registers GLUSTER FSAL objectoperation vector
3200 	 */
3201 	
3202 	void handle_ops_init(struct fsal_obj_ops *ops)
3203 	{
3204 		fsal_default_obj_ops_init(ops);
3205 	
3206 		ops->release = handle_release;
3207 		ops->merge = glusterfs_merge;
3208 		ops->lookup = lookup;
3209 		ops->mkdir = makedir;
3210 		ops->mknode = makenode;
3211 		ops->readdir = read_dirents;
3212 		ops->symlink = makesymlink;
3213 		ops->readlink = readsymlink;
3214 		ops->getattrs = getattrs;
3215 		ops->link = linkfile;
3216 		ops->rename = renamefile;
3217 		ops->unlink = file_unlink;
3218 		ops->handle_to_wire = handle_to_wire;
3219 		ops->handle_to_key = handle_to_key;
3220 		ops->close = file_close;
3221 	
3222 		/* fops with OpenTracking (multi-fd) enabled */
3223 		ops->open2 = glusterfs_open2;
3224 		ops->reopen2 = glusterfs_reopen2;
3225 		ops->read2 = glusterfs_read2;
3226 		ops->write2 = glusterfs_write2;
3227 		ops->commit2 = glusterfs_commit2;
3228 		ops->lock_op2 = glusterfs_lock_op2;
3229 		ops->setattr2 = glusterfs_setattr2;
3230 		ops->close2 = glusterfs_close2;
3231 		ops->seek2 = seek2;
3232 	#ifdef USE_GLUSTER_DELEGATION
3233 		ops->lease_op2 = glusterfs_lease_op2;
3234 	#endif
3235 	
3236 		/* pNFS related ops */
3237 		handle_ops_pnfs(ops);
3238 	}
3239