1    	/*
2    	 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Copyright (C) Red Hat  Inc., 2013
5    	 * Author: Anand Subramanian anands@redhat.com
6    	 *
7    	 * This program is free software; you can redistribute it and/or
8    	 * modify it under the terms of the GNU Lesser General Public
9    	 * License as published by the Free Software Foundation; either
10   	 * version 3 of the License, or (at your option) any later version.
11   	 *
12   	 * This program is distributed in the hope that it will be useful,
13   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   	 * Lesser General Public License for more details.
16   	 *
17   	 * You should have received a copy of the GNU Lesser General Public
18   	 * License along with this library; if not, write to the Free Software
19   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
20   	 *
21   	 * -------------
22   	 */
23   	
24   	/**
25   	 * @file  export.c
26   	 * @author Shyamsundar R <srangana@redhat.com>
27   	 * @author Anand Subramanian <anands@redhat.com>
28   	 *
29   	 * @brief GLUSTERFS FSAL export object
30   	 */
31   	
32   	#include <fcntl.h>
33   	#include <unistd.h>
34   	#include <sys/types.h>
35   	#include <pthread.h>
36   	#include "fsal.h"
37   	#include "FSAL/fsal_config.h"
38   	#include "fsal_convert.h"
39   	#include "config_parsing.h"
40   	#include "gluster_internal.h"
41   	#include "nfs_exports.h"
42   	#include "export_mgr.h"
43   	#include "pnfs_utils.h"
44   	#include "sal_data.h"
45   	
46   	/* The default location of gfapi log
47   	 * if glfs_log param is not defined in
48   	 * the export file */
49   	#define GFAPI_LOG_LOCATION "/var/log/ganesha/ganesha-gfapi.log"
50   	
51   	/**
52   	 * @brief Implements GLUSTER FSAL exportoperation release
53   	 */
54   	
55   	static void export_release(struct fsal_export *exp_hdl)
56   	{
57   		struct glusterfs_export *glfs_export =
58   		    container_of(exp_hdl, struct glusterfs_export, export);
59   	
60   		/* check activity on the export */
61   	
62   		/* detach the export */
63   		fsal_detach_export(glfs_export->export.fsal,
64   				   &glfs_export->export.exports);
65   		free_export_ops(&glfs_export->export);
66   	
67   		glusterfs_free_fs(glfs_export->gl_fs);
68   	
69   		glfs_export->gl_fs = NULL;
70   		gsh_free(glfs_export->export_path);
71   		glfs_export->export_path = NULL;
72   		gsh_free(glfs_export->sec_label_xattr);
73   		glfs_export->sec_label_xattr = NULL;
74   		gsh_free(glfs_export);
75   		glfs_export = NULL;
76   	}
77   	
78   	/**
79   	 * @brief Implements GLUSTER FSAL exportoperation lookup_path
80   	 */
81   	
82   	static fsal_status_t lookup_path(struct fsal_export *export_pub,
83   					 const char *path,
84   					 struct fsal_obj_handle **pub_handle,
85   					 struct attrlist *attrs_out)
86   	{
87   		int rc = 0;
88   		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
89   		char *realpath = NULL;
90   		struct stat sb;
91   		struct glfs_object *glhandle = NULL;
92   		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
93   		struct glusterfs_handle *objhandle = NULL;
94   		struct glusterfs_export *glfs_export =
95   		    container_of(export_pub, struct glusterfs_export, export);
96   		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
97   	
98   		LogFullDebug(COMPONENT_FSAL, "In args: path = %s", path);
99   	
100  		*pub_handle = NULL;
101  	
102  		if (strcmp(path, glfs_export->mount_path) == 0) {
103  			realpath = gsh_strdup(glfs_export->export_path);
104  		} else {
105  			/*
106  			 *  mount path is not same as the exported one. Should be subdir
107  			 *  then.
108  			 */
109  			/** @todo: How do we handle symlinks if present in the path.
110  			 */
111  			realpath = gsh_malloc(strlen(glfs_export->export_path) +
112  					      strlen(path) + 1);
113  			/*
114  			 * Handle the case wherein glfs_export->export_path
115  			 * is root i.e, '/' separately.
116  			 */
117  			if (strlen(glfs_export->export_path) != 1) {
118  				strcpy(realpath, glfs_export->export_path);
119  				strcpy((realpath +
120  					strlen(glfs_export->export_path)),
121  					&path[strlen(glfs_export->mount_path)]);
122  			} else {
123  				strcpy(realpath,
124  					&path[strlen(glfs_export->mount_path)]);
125  			}
126  		}
127  	
128  		glhandle = glfs_h_lookupat(glfs_export->gl_fs->fs, NULL, realpath,
129  					&sb, 1);
130  		if (glhandle == NULL) {
131  			status = gluster2fsal_error(errno);
132  			goto out;
133  		}
134  	
135  		rc = glfs_h_extract_handle(glhandle, globjhdl, GFAPI_HANDLE_LENGTH);
136  		if (rc < 0) {
137  			status = gluster2fsal_error(errno);
138  			goto out;
139  		}
140  	
141  		rc = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
142  				       GLAPI_UUID_LENGTH);
143  		if (rc < 0) {
144  			status = gluster2fsal_error(errno);
145  			goto out;
146  		}
147  	
148  		construct_handle(glfs_export, &sb, glhandle, globjhdl,
149  				 &objhandle, vol_uuid);
150  	
151  		if (attrs_out != NULL) {
152  			posix2fsal_attributes_all(&sb, attrs_out);
153  		}
154  	
155  		*pub_handle = &objhandle->handle;
156  	
157  		gsh_free(realpath);
158  	
159  		return status;
160  	 out:
161  		gluster_cleanup_vars(glhandle);
162  		gsh_free(realpath);
163  	
164  		return status;
165  	}
166  	
167  	/**
168  	 * @brief Implements GLUSTER FSAL exportoperation wire_to_host
169  	 */
170  	
171  	static fsal_status_t wire_to_host(struct fsal_export *exp_hdl,
172  					  fsal_digesttype_t in_type,
173  					  struct gsh_buffdesc *fh_desc,
174  					  int flags)
175  	{
176  		size_t fh_size;
177  	#ifdef GLTIMING
178  		struct timespec s_time, e_time;
179  	
180  		now(&s_time);
181  	#endif
182  	
183  		/* sanity checks */
184  		if (!fh_desc || !fh_desc->addr)
185  			return fsalstat(ERR_FSAL_FAULT, 0);
186  	
187  		fh_size = GLAPI_HANDLE_LENGTH;
188  		if (fh_desc->len != fh_size) {
189  			LogMajor(COMPONENT_FSAL,
190  				 "Size mismatch for handle.  should be %zu, got %zu",
191  				 fh_size, fh_desc->len);
192  			return fsalstat(ERR_FSAL_SERVERFAULT, 0);
193  		}
194  	
195  		fh_desc->len = fh_size;	/* pass back the actual size */
196  	
197  	#ifdef GLTIMING
198  		now(&e_time);
199  		latency_update(&s_time, &e_time, lat_wire_to_host);
200  	#endif
201  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
202  	}
203  	
204  	/**
205  	 * @brief Implements GLUSTER FSAL exportoperation create_handle
206  	 */
207  	
208  	static fsal_status_t create_handle(struct fsal_export *export_pub,
209  					   struct gsh_buffdesc *fh_desc,
210  					   struct fsal_obj_handle **pub_handle,
211  					   struct attrlist *attrs_out)
212  	{
213  		int rc = 0;
214  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
215  		struct stat sb;
216  		struct glfs_object *glhandle = NULL;
217  		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
218  		struct glusterfs_handle *objhandle = NULL;
219  		struct glusterfs_export *glfs_export =
220  		    container_of(export_pub, struct glusterfs_export, export);
221  		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
222  	#ifdef GLTIMING
223  		struct timespec s_time, e_time;
224  	
225  		now(&s_time);
226  	#endif
227  	
228  		*pub_handle = NULL;
229  	
230  		if (fh_desc->len != GLAPI_HANDLE_LENGTH) {
231  			status.major = ERR_FSAL_INVAL;
232  			goto out;
233  		}
234  	
235  		/* First 16bytes contain volume UUID. globjhdl is in the second */
236  		/* half(16bytes) of the fs_desc->addr.  */
237  		memcpy(globjhdl, fh_desc->addr+GLAPI_UUID_LENGTH, GFAPI_HANDLE_LENGTH);
238  	
239  		glhandle =
240  		    glfs_h_create_from_handle(glfs_export->gl_fs->fs, globjhdl,
241  					      GFAPI_HANDLE_LENGTH, &sb);
242  		if (glhandle == NULL) {
243  			status = gluster2fsal_error(errno);
244  			goto out;
245  		}
246  	
247  		rc = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
248  				       GLAPI_UUID_LENGTH);
249  		if (rc < 0) {
250  			status = gluster2fsal_error(errno);
251  			goto out;
252  		}
253  	
254  		construct_handle(glfs_export, &sb, glhandle, globjhdl,
255  				 &objhandle, vol_uuid);
256  	
257  		if (attrs_out != NULL) {
258  			posix2fsal_attributes_all(&sb, attrs_out);
259  		}
260  	
261  		*pub_handle = &objhandle->handle;
262  	 out:
263  		if (status.major != ERR_FSAL_NO_ERROR)
264  			gluster_cleanup_vars(glhandle);
265  	#ifdef GLTIMING
266  		now(&e_time);
267  		latency_update(&s_time, &e_time, lat_create_handle);
268  	#endif
269  		return status;
270  	}
271  	
272  	/**
273  	 * @brief Given a glfs_object handle, construct handle for
274  	 * FSAL to use.
275  	 */
276  	
277  	fsal_status_t glfs2fsal_handle(struct glusterfs_export *glfs_export,
278  				       struct glfs_object *glhandle,
279  				       struct fsal_obj_handle **pub_handle,
280  				       struct stat *sb,
281  				       struct attrlist *attrs_out)
282  	{
283  		int rc = 0;
284  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
285  		unsigned char globjhdl[GFAPI_HANDLE_LENGTH] = {'\0'};
286  		struct glusterfs_handle *objhandle = NULL;
287  		char vol_uuid[GLAPI_UUID_LENGTH] = {'\0'};
288  	#ifdef GLTIMING
289  		struct timespec s_time, e_time;
290  	
291  		now(&s_time);
292  	#endif
293  	
294  		*pub_handle = NULL;
295  	
296  		if (!glfs_export || !glhandle) {
297  			status.major = ERR_FSAL_INVAL;
298  			goto out;
299  		}
300  	
301  		rc = glfs_h_extract_handle(glhandle, globjhdl, GFAPI_HANDLE_LENGTH);
302  		if (rc < 0) {
303  			status = gluster2fsal_error(errno);
304  			goto out;
305  		}
306  		rc = glfs_get_volumeid(glfs_export->gl_fs->fs, vol_uuid,
307  					 GLAPI_UUID_LENGTH);
308  		if (rc < 0) {
309  			status = gluster2fsal_error(errno);
310  			goto out;
311  		}
312  	
313  		construct_handle(glfs_export, sb, glhandle, globjhdl,
314  				 &objhandle, vol_uuid);
315  	
316  		if (attrs_out != NULL) {
317  			posix2fsal_attributes_all(sb, attrs_out);
318  		}
319  	
320  		*pub_handle = &objhandle->handle;
321  	 out:
322  	#ifdef GLTIMING
323  		now(&e_time);
324  		latency_update(&s_time, &e_time, lat_create_handle);
325  	#endif
326  		return status;
327  	}
328  	
329  	/**
330  	 * @brief Implements GLUSTER FSAL exportoperation get_fs_dynamic_info
331  	 */
332  	
333  	static fsal_status_t get_dynamic_info(struct fsal_export *exp_hdl,
334  					      struct fsal_obj_handle *obj_hdl,
335  					      fsal_dynamicfsinfo_t *infop)
336  	{
337  		int rc = 0;
338  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
339  		struct statvfs vfssb;
340  		struct glusterfs_export *glfs_export =
341  		    container_of(exp_hdl, struct glusterfs_export, export);
342  	
343  		rc = glfs_statvfs(glfs_export->gl_fs->fs, glfs_export->export_path,
344  				  &vfssb);
345  		if (rc != 0)
346  			return gluster2fsal_error(errno);
347  	
348  		memset(infop, 0, sizeof(fsal_dynamicfsinfo_t));
349  		infop->total_bytes = vfssb.f_frsize * vfssb.f_blocks;
350  		infop->free_bytes = vfssb.f_frsize * vfssb.f_bfree;
351  		infop->avail_bytes = vfssb.f_frsize * vfssb.f_bavail;
352  		infop->total_files = vfssb.f_files;
353  		infop->free_files = vfssb.f_ffree;
354  		infop->avail_files = vfssb.f_favail;
355  		infop->time_delta.tv_sec = 1;
356  		infop->time_delta.tv_nsec = 0;
357  	
358  		return status;
359  	}
360  	
361  	/**
362  	 * @brief Allocate a state_t structure
363  	 *
364  	 * Note that this is not expected to fail since memory allocation is
365  	 * expected to abort on failure.
366  	 *
367  	 * @param[in] exp_hdl	       Export state_t will be associated with
368  	 * @param[in] state_type	    Type of state to allocate
369  	 * @param[in] related_state         Related state if appropriate
370  	 *
371  	 * @returns a state structure.
372  	 */
373  	struct state_t *glusterfs_alloc_state(struct fsal_export *exp_hdl,
374  					enum state_type state_type,
375  					struct state_t *related_state)
376  	{
377  		struct state_t *state;
378  		struct glusterfs_fd *my_fd;
379  	
(4) Event example_assign: Example 2: Assigning: "p_" = return value from "calloc(1UL, 336UL)".
(5) Event example_checked: Example 2 (cont.): "p_" has its value checked in "p_ == NULL".
Also see events: [returned_null][example_assign][example_checked][example_assign][example_checked][example_assign][example_checked][example_assign][example_checked][var_assigned][dereference]
380  		state = init_state(gsh_calloc(1, sizeof(struct glusterfs_state_fd)),
381  				   exp_hdl, state_type, related_state);
382  	
383  		my_fd = &container_of(state, struct glusterfs_state_fd,
384  				      state)->glusterfs_fd;
385  	
386  		my_fd->glfd = NULL;
387  		my_fd->openflags = FSAL_O_CLOSED;
388  		PTHREAD_RWLOCK_init(&my_fd->fdlock, NULL);
389  	
390  		return state;
391  	}
392  	
393  	/**
394  	 * @brief free a gluster_state_fd structure
395  	 *
396  	 * @param[in] exp_hdl  Export state_t will be associated with
397  	 * @param[in] state    Related state if appropriate
398  	 *
399  	 */
400  	void glusterfs_free_state(struct fsal_export *exp_hdl, struct state_t *state)
401  	{
402  		struct glusterfs_state_fd *state_fd =
403  			container_of(state, struct glusterfs_state_fd, state);
404  		struct glusterfs_fd *my_fd = &state_fd->glusterfs_fd;
405  	
406  		PTHREAD_RWLOCK_destroy(&my_fd->fdlock);
407  	
408  		gsh_free(state_fd);
409  	}
410  	
411  	/** @todo: We have gone POSIX way for the APIs below, can consider the CEPH way
412  	 * in case all are constants across all volumes etc.
413  	 */
414  	
415  	/**
416  	 * @brief Implements GLUSTER FSAL exportoperation fs_supported_attrs
417  	 */
418  	
419  	static attrmask_t fs_supported_attrs(struct fsal_export *exp_hdl)
420  	{
421  		attrmask_t supported_mask;
422  	
423  		supported_mask = fsal_supported_attrs(&exp_hdl->fsal->fs_info);
424  		if (!NFSv4_ACL_SUPPORT)
425  			supported_mask &= ~ATTR_ACL;
426  		return supported_mask;
427  	}
428  	
429  	/**
430  	 * @brief Implements GLUSTER FSAL exportoperation check_quota
431  	 */
432  	/*
433  	static fsal_status_t check_quota(struct fsal_export *exp_hdl,
434  					 const char * filepath,
435  					 int quota_type,
436  					 struct req_op_context *req_ctx)
437  	{
438  		return fsalstat(ERR_FSAL_NO_ERROR, 0) ;
439  	}
440  	*/
441  	/**
442  	 * @brief Implements GLUSTER FSAL exportoperation get_quota
443  	 */
444  	/*
445  	static fsal_status_t get_quota(struct fsal_export *exp_hdl,
446  				       const char * filepath,
447  				       int quota_type,
448  				       struct req_op_context *req_ctx,
449  				       fsal_quota_t *pquota)
450  	{
451  		return fsalstat(ERR_FSAL_NOTSUPP, 0) ;
452  	}
453  	*/
454  	/**
455  	 * @brief Implements GLUSTER FSAL exportoperation set_quota
456  	 */
457  	/*
458  	static fsal_status_t set_quota(struct fsal_export *exp_hdl,
459  				       const char *filepath,
460  				       int quota_type,
461  				       struct req_op_context *req_ctx,
462  				       fsal_quota_t * pquota,
463  				       fsal_quota_t * presquota)
464  	{
465  		return fsalstat(ERR_FSAL_NOTSUPP, 0) ;
466  	}
467  	*/
468  	
469  	/**
470  	 * @brief Registers GLUSTER FSAL exportoperation vector
471  	 *
472  	 * This function overrides operations that we've implemented, leaving
473  	 * the rest for the default.
474  	 *
475  	 * @param[in,out] ops Operations vector
476  	 */
477  	
478  	void export_ops_init(struct export_ops *ops)
479  	{
480  		ops->release = export_release;
481  		ops->lookup_path = lookup_path;
482  		ops->wire_to_host = wire_to_host;
483  		ops->create_handle = create_handle;
484  		ops->get_fs_dynamic_info = get_dynamic_info;
485  		ops->fs_supported_attrs = fs_supported_attrs;
486  		ops->alloc_state = glusterfs_alloc_state;
487  		ops->free_state = glusterfs_free_state;
488  	}
489  	
490  	enum transport {
491  		GLUSTER_TCP_VOL,
492  		GLUSTER_RDMA_VOL
493  	};
494  	
495  	struct glexport_params {
496  		char *glvolname;
497  		char *glhostname;
498  		char *glvolpath;
499  		char *glfs_log;
500  		uint64_t up_poll_usec;
501  		bool enable_upcall;
502  		enum transport gltransport;
503  		char *sec_label_xattr;
504  	};
505  	
506  	static struct config_item_list transportformats[] = {
507  		CONFIG_LIST_TOK("tcp", GLUSTER_TCP_VOL),
508  		CONFIG_LIST_TOK("rdma", GLUSTER_RDMA_VOL),
509  		CONFIG_LIST_EOL
510  	};
511  	
512  	static struct config_item export_params[] = {
513  		CONF_ITEM_NOOP("name"),
514  		CONF_MAND_STR("volume", 1, MAXPATHLEN, NULL,
515  			      glexport_params, glvolname),
516  		CONF_MAND_STR("hostname", 1, MAXPATHLEN, NULL,
517  			      glexport_params, glhostname),
518  		CONF_ITEM_PATH("volpath", 1, MAXPATHLEN, "/",
519  			      glexport_params, glvolpath),
520  		CONF_ITEM_PATH("glfs_log", 1, MAXPATHLEN, GFAPI_LOG_LOCATION,
521  			       glexport_params, glfs_log),
522  		CONF_ITEM_UI64("up_poll_usec", 1, 60*1000*1000, 10,
523  			       glexport_params, up_poll_usec),
524  		CONF_ITEM_BOOL("enable_upcall", true, glexport_params,
525  			       enable_upcall),
526  		CONF_ITEM_TOKEN("transport", GLUSTER_TCP_VOL, transportformats,
527  				glexport_params, gltransport),
528  		CONF_ITEM_STR("sec_label_xattr", 0, 256, "security.selinux",
529  				glexport_params, sec_label_xattr),
530  		CONFIG_EOL
531  	};
532  	
533  	
534  	static struct config_block export_param = {
535  		.dbus_interface_name = "org.ganesha.nfsd.config.fsal.gluster-export%d",
536  		.blk_desc.name = "FSAL",
537  		.blk_desc.type = CONFIG_BLOCK,
538  		.blk_desc.u.blk.init = noop_conf_init,
539  		.blk_desc.u.blk.params = export_params,
540  		.blk_desc.u.blk.commit = noop_conf_commit
541  	};
542  	
543  	/*
544  	 * Given glusterfs_fs object, decrement the refcount. In case if it
545  	 * becomes zero, free the resources.
546  	 */
547  	void
548  	glusterfs_free_fs(struct glusterfs_fs *gl_fs)
549  	{
550  		int64_t refcnt;
551  		int err     = 0;
552  	
553  		PTHREAD_MUTEX_lock(&GlusterFS.lock);
554  	
555  		refcnt = --(gl_fs->refcnt);
556  	
557  		assert(refcnt >= 0);
558  	
559  		if (refcnt) {
560  			LogDebug(COMPONENT_FSAL,
561  				 "There are still (%ld)active shares for volume(%s)",
562  				 gl_fs->refcnt, gl_fs->volname);
563  			PTHREAD_MUTEX_unlock(&GlusterFS.lock);
564  			return;
565  		}
566  	
567  		glist_del(&gl_fs->fs_obj);
568  		PTHREAD_MUTEX_unlock(&GlusterFS.lock);
569  	
570  		atomic_inc_int8_t(&gl_fs->destroy_mode);
571  	
572  		if (!gl_fs->enable_upcall)
573  			goto skip_upcall;
574  	
575  		/* Cancel upcall readiness if not yet done */
576  		up_ready_cancel((struct fsal_up_vector *)gl_fs->up_ops);
577  	
578  	#ifndef USE_GLUSTER_UPCALL_REGISTER
579  		int *retval = NULL;
580  		/* Wait for up_thread to exit */
581  		err = pthread_join(gl_fs->up_thread, (void **)&retval);
582  	
583  		if (retval && *retval) {
584  			LogDebug(COMPONENT_FSAL, "Up_thread join returned value %d",
585  				 *retval);
586  		}
587  	
588  		if (err) {
589  			LogWarn(COMPONENT_FSAL, "Up_thread join failed (%s)",
590  				strerror(err));
591  		}
592  	#else
593  		err = glfs_upcall_unregister(gl_fs->fs, GLFS_EVENT_ANY);
594  	
595  		if ((err < 0) || (!(err & GLFS_EVENT_INODE_INVALIDATE))) {
596  			/* Or can we ignore the error like in case of single
597  			 * node ganesha server. */
598  			LogWarn(COMPONENT_FSAL,
599  				"Unable to unregister for upcalls. Volume: %s",
600  				gl_fs->volname);
601  		}
602  	#endif
603  	
604  	skip_upcall:
605  		/* Gluster and memory cleanup */
606  		glfs_fini(gl_fs->fs);
607  		gsh_free(gl_fs->volname);
608  		gsh_free(gl_fs);
609  	}
610  	
611  	/**
612  	 * @brief Given Gluster export params, find and return if there is
613  	 * already existing export entry. If not create one.
614  	 */
615  	struct glusterfs_fs*
616  	glusterfs_get_fs(struct glexport_params params,
617  			 const struct fsal_up_vector *up_ops)
618  	{
619  		int rc = 0;
620  		struct glusterfs_fs *gl_fs = NULL;
621  		glfs_t  *fs = NULL;
622  		struct glist_head *glist, *glistn;
623  	
624  		PTHREAD_MUTEX_lock(&GlusterFS.lock);
625  	
626  		glist_for_each_safe(glist, glistn, &GlusterFS.fs_obj) {
627  			gl_fs = glist_entry(glist, struct glusterfs_fs,
628  					    fs_obj);
629  			if (!strcmp(params.glvolname, gl_fs->volname)) {
630  				goto found;
631  			}
632  		}
633  	
634  		gl_fs = gsh_calloc(1, sizeof(struct glusterfs_fs));
635  	
636  		glist_init(&gl_fs->fs_obj);
637  	
638  		fs = glfs_new(params.glvolname);
639  		if (!fs) {
640  			LogCrit(COMPONENT_FSAL,
641  				"Unable to create new glfs. Volume: %s",
642  				params.glvolname);
643  			goto out;
644  		}
645  	
646  		switch (params.gltransport) {
647  		case GLUSTER_RDMA_VOL:
648  			rc = glfs_set_volfile_server(fs, "rdma", params.glhostname,
649  					24007);
650  			break;
651  		default:
652  			rc = glfs_set_volfile_server(fs, "tcp", params.glhostname,
653  					24007);
654  			break;
655  		}
656  	
657  		if (rc != 0) {
658  			LogCrit(COMPONENT_FSAL,
659  				"Unable to set volume file. Volume: %s",
660  				params.glvolname);
661  			goto out;
662  		}
663  	
664  		rc = glfs_set_logging(fs, params.glfs_log, 7);
665  		if (rc != 0) {
666  			LogCrit(COMPONENT_FSAL,
667  				"Unable to set logging. Volume: %s",
668  				params.glvolname);
669  			goto out;
670  		}
671  	
672  		rc = glfs_init(fs);
673  		if (rc != 0) {
674  			LogCrit(COMPONENT_FSAL,
675  				"Unable to initialize volume. Volume: %s",
676  				params.glvolname);
677  			goto out;
678  		}
679  	
680  		gl_fs->fs = fs;
681  		gl_fs->volname = strdup(params.glvolname);
682  		gl_fs->destroy_mode = 0;
683  		gl_fs->up_poll_usec = params.up_poll_usec;
684  	
685  		gl_fs->up_ops = up_ops;
686  	
687  		gl_fs->enable_upcall = params.enable_upcall;
688  	
689  		if (!gl_fs->enable_upcall)
690  			goto skip_upcall;
691  	
692  	#ifndef USE_GLUSTER_UPCALL_REGISTER
693  		rc = initiate_up_thread(gl_fs);
694  		if (rc != 0) {
695  			LogCrit(COMPONENT_FSAL,
696  				"Unable to create GLUSTERFSAL_UP_Thread. Volume: %s",
697  				params.glvolname);
698  			goto out;
699  		}
700  	#else
701  		/* We are mainly interested in INODE_INVALIDATE for now. Still
702  		 * register for all the events
703  		 */
704  		rc = glfs_upcall_register(fs, GLFS_EVENT_ANY, gluster_process_upcall,
705  					  gl_fs);
706  	
707  		if ((rc < 0) || (!(rc & GLFS_EVENT_INODE_INVALIDATE))) {
708  			/* Or can we ignore the error like in case of single
709  			 * node ganesha server. */
710  			LogCrit(COMPONENT_FSAL,
711  				"Unable to register for upcalls. Volume: %s",
712  				params.glvolname);
713  			goto out;
714  		}
715  	#endif
716  	
717  	skip_upcall:
718  		glist_add(&GlusterFS.fs_obj, &gl_fs->fs_obj);
719  	
720  	found:
721  		++(gl_fs->refcnt);
722  		PTHREAD_MUTEX_unlock(&GlusterFS.lock);
723  		return gl_fs;
724  	
725  	out:
726  		PTHREAD_MUTEX_unlock(&GlusterFS.lock);
727  		if (fs)
728  			glfs_fini(fs);
729  	
730  		glist_del(&gl_fs->fs_obj); /* not needed atm */
731  		gsh_free(gl_fs);
732  	
733  		return NULL;
734  	}
735  	
736  	/**
737  	 * @brief Implements GLUSTER FSAL moduleoperation create_export
738  	 */
739  	
740  	fsal_status_t glusterfs_create_export(struct fsal_module *fsal_hdl,
741  					      void *parse_node,
742  					      struct config_error_type *err_type,
743  					      const struct fsal_up_vector *up_ops)
744  	{
745  		int rc = 0;
746  		fsal_status_t status = { ERR_FSAL_NO_ERROR, 0 };
747  		struct glusterfs_export *glfsexport = NULL;
748  		bool fsal_attached = false;
749  		struct glexport_params params = {
750  			.glvolname = NULL,
751  			.glhostname = NULL,
752  			.glvolpath = NULL,
753  			.glfs_log = NULL};
754  	
755  		LogDebug(COMPONENT_FSAL, "In args: export path = %s",
756  			 op_ctx->ctx_export->fullpath);
757  	
758  		glfsexport = gsh_calloc(1, sizeof(struct glusterfs_export));
759  	
760  		rc = load_config_from_node(parse_node,
761  					   &export_param,
762  					   &params,
763  					   true,
764  					   err_type);
765  		if (rc != 0) {
766  			LogCrit(COMPONENT_FSAL,
767  				"Incorrect or missing parameters for export %s",
768  				op_ctx->ctx_export->fullpath);
769  			status.major = ERR_FSAL_INVAL;
770  			goto out;
771  		}
772  		LogEvent(COMPONENT_FSAL, "Volume %s exported at : '%s'",
773  			 params.glvolname, params.glvolpath);
774  	
775  		fsal_export_init(&glfsexport->export);
776  		export_ops_init(&glfsexport->export.exp_ops);
777  	
778  		glfsexport->gl_fs = glusterfs_get_fs(params, up_ops);
779  		if (!glfsexport->gl_fs) {
780  			status.major = ERR_FSAL_SERVERFAULT;
781  			goto out;
782  		}
783  	
784  		rc = fsal_attach_export(fsal_hdl, &glfsexport->export.exports);
785  		if (rc != 0) {
786  			status.major = ERR_FSAL_SERVERFAULT;
787  			LogCrit(COMPONENT_FSAL, "Unable to attach export. Export: %s",
788  				op_ctx->ctx_export->fullpath);
789  			goto out;
790  		}
791  		fsal_attached = true;
792  	
793  		glfsexport->mount_path = op_ctx->ctx_export->fullpath;
794  		glfsexport->export_path = params.glvolpath;
795  		glfsexport->saveduid = geteuid();
796  		glfsexport->savedgid = getegid();
797  		glfsexport->export.fsal = fsal_hdl;
798  		glfsexport->sec_label_xattr = params.sec_label_xattr;
799  	
800  		op_ctx->fsal_export = &glfsexport->export;
801  	
802  		glfsexport->pnfs_ds_enabled =
803  			glfsexport->export.exp_ops.fs_supports(&glfsexport->export,
804  							fso_pnfs_ds_supported);
805  		if (glfsexport->pnfs_ds_enabled) {
806  			struct fsal_pnfs_ds *pds = NULL;
807  	
808  			status =
809  			    fsal_hdl->m_ops.fsal_pnfs_ds(fsal_hdl, parse_node, &pds);
810  	
811  			if (status.major != ERR_FSAL_NO_ERROR)
812  				goto out;
813  	
814  			/* special case: server_id matches export_id */
815  			pds->id_servers = op_ctx->ctx_export->export_id;
816  			pds->mds_export = op_ctx->ctx_export;
817  			pds->mds_fsal_export = op_ctx->fsal_export;
818  	
819  			if (!pnfs_ds_insert(pds)) {
820  				LogCrit(COMPONENT_CONFIG,
821  					"Server id %d already in use.",
822  					pds->id_servers);
823  				status.major = ERR_FSAL_EXIST;
824  				goto out;
825  			}
826  	
827  			LogDebug(COMPONENT_PNFS,
828  				 "glusterfs_fsal_create: pnfs ds was enabled for [%s]",
829  				 op_ctx->ctx_export->fullpath);
830  		}
831  	
832  		glfsexport->pnfs_mds_enabled =
833  			glfsexport->export.exp_ops.fs_supports(&glfsexport->export,
834  							fso_pnfs_mds_supported);
835  		if (glfsexport->pnfs_mds_enabled) {
836  			LogDebug(COMPONENT_PNFS,
837  				 "glusterfs_fsal_create: pnfs mds was enabled for [%s]",
838  				 op_ctx->ctx_export->fullpath);
839  			export_ops_pnfs(&glfsexport->export.exp_ops);
840  			fsal_ops_pnfs(&glfsexport->export.fsal->m_ops);
841  		}
842  	
843  		glfsexport->export.up_ops = up_ops;
844  	
845  	 out:
846  		gsh_free(params.glvolname);
847  		gsh_free(params.glhostname);
848  		gsh_free(params.glfs_log);
849  	
850  		if (status.major != ERR_FSAL_NO_ERROR) {
851  			gsh_free(params.glvolpath);
852  	
853  			if (fsal_attached)
854  				fsal_detach_export(fsal_hdl,
855  						   &glfsexport->export.exports);
856  			if (glfsexport->gl_fs)
857  				glusterfs_free_fs(glfsexport->gl_fs);
858  			gsh_free(glfsexport);
859  		}
860  	
861  		return status;
862  	}
863  	
864