1    	/*
2    	 * vim:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Copyright 2017-2019 Red Hat, Inc.
5    	 * Author: Daniel Gryniewicz  dang@redhat.com
6    	 *
7    	 *
8    	 * This program is free software; you can redistribute it and/or
9    	 * modify it under the terms of the GNU Lesser General Public
10   	 * License as published by the Free Software Foundation; either
11   	 * version 3 of the License, or (at your option) any later version.
12   	 *
13   	 * This program is distributed in the hope that it will be useful,
14   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16   	 * Lesser General Public License for more details.
17   	 *
18   	 * You should have received a copy of the GNU Lesser General Public
19   	 * License along with this library; if not, write to the Free Software
20   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21   	 * 02110-1301 USA
22   	 *
23   	 * -------------
24   	 */
25   	
26   	/* handle.c
27   	 */
28   	
29   	#include "config.h"
30   	
31   	#include <unistd.h>
32   	#include <stdlib.h>
33   	#include "fsal.h"
34   	#include "fsal_convert.h"
35   	#include "FSAL/fsal_commonlib.h"
36   	#include "mem_int.h"
37   	#include "city.h"
38   	#include "nfs_file_handle.h"
39   	#include "display.h"
40   	#ifdef USE_LTTNG
41   	#include "gsh_lttng/fsal_mem.h"
42   	#endif
43   	#include "../Stackable_FSALs/FSAL_MDCACHE/mdcache_ext.h"
44   	#include "nfs_core.h"
45   	#include "common_utils.h"
46   	
47   	static void mem_release(struct fsal_obj_handle *obj_hdl);
48   	
49   	/* Atomic uint64_t that is used to generate inode numbers in the mem FS */
50   	uint64_t mem_inode_number = 1;
51   	
52   	/* helpers
53   	 */
54   	
55   	static inline int
56   	mem_n_cmpf(const struct avltree_node *lhs,
57   			const struct avltree_node *rhs)
58   	{
59   		struct mem_dirent *lk, *rk;
60   	
61   		lk = avltree_container_of(lhs, struct mem_dirent, avl_n);
62   		rk = avltree_container_of(rhs, struct mem_dirent, avl_n);
63   	
64   		return strcmp(lk->d_name, rk->d_name);
65   	}
66   	
67   	static inline int
68   	mem_i_cmpf(const struct avltree_node *lhs,
69   			const struct avltree_node *rhs)
70   	{
71   		struct mem_dirent *lk, *rk;
72   	
73   		lk = avltree_container_of(lhs, struct mem_dirent, avl_i);
74   		rk = avltree_container_of(rhs, struct mem_dirent, avl_i);
75   	
76   		if (lk->d_index < rk->d_index)
77   			return -1;
78   	
79   		if (lk->d_index == rk->d_index)
80   			return 0;
81   	
82   		return 1;
83   	}
84   	
85   	/**
86   	 * @brief Clean up and free an object handle
87   	 *
88   	 * @param[in] obj_hdl	Handle to release
89   	 */
90   	static void mem_cleanup(struct mem_fsal_obj_handle *myself)
91   	{
92   		struct mem_fsal_export *mfe;
93   	
94   		mfe = myself->mfo_exp;
95   	
96   		if (myself->is_export || !glist_empty(&myself->dirents)) {
97   			/* Entry is still live: it's either an export, or in a dir.
98   			 * This is likely a bug. */
99   	#ifdef USE_LTTNG
100  			tracepoint(fsalmem, mem_inuse, __func__, __LINE__,
101  				   &myself->obj_handle, myself->attrs.numlinks,
102  				   myself->is_export);
103  	#endif
104  			LogDebug(COMPONENT_FSAL,
105  				 "Releasing live hdl=%p, name=%s, don't deconstruct it",
106  				 myself, myself->m_name);
107  			return;
108  		}
109  	
110  		fsal_obj_handle_fini(&myself->obj_handle);
111  	
112  		LogDebug(COMPONENT_FSAL,
113  			 "Releasing obj_hdl=%p, myself=%p, name=%s",
114  			 &myself->obj_handle, myself, myself->m_name);
115  	
116  		switch (myself->obj_handle.type) {
117  		case DIRECTORY:
118  			/* Empty directory */
119  			mem_clean_all_dirents(myself);
120  			break;
121  		case REGULAR_FILE:
122  			break;
123  		case SYMBOLIC_LINK:
124  			gsh_free(myself->mh_symlink.link_contents);
125  			break;
126  		case SOCKET_FILE:
127  		case CHARACTER_FILE:
128  		case BLOCK_FILE:
129  		case FIFO_FILE:
130  			break;
131  		default:
132  			break;
133  		}
134  	
135  		PTHREAD_RWLOCK_wrlock(&mfe->mfe_exp_lock);
136  		mem_free_handle(myself);
137  		PTHREAD_RWLOCK_unlock(&mfe->mfe_exp_lock);
138  	}
139  	
140  	#define mem_int_get_ref(myself) _mem_int_get_ref(myself, __func__, __LINE__)
141  	/**
142  	 * @brief Get a ref for a handle
143  	 *
144  	 * @param[in] myself	Handle to ref
145  	 * @param[in] func	Function getting ref
146  	 * @param[in] line	Line getting ref
147  	 */
148  	static void _mem_int_get_ref(struct mem_fsal_obj_handle *myself,
149  				     const char *func, int line)
150  	{
151  	#ifdef USE_LTTNG
152  		int32_t refcount =
153  	#endif
154  			atomic_inc_int32_t(&myself->refcount);
155  	
156  	#ifdef USE_LTTNG
157  		tracepoint(fsalmem, mem_get_ref, func, line, &myself->obj_handle,
158  			   myself->m_name, refcount);
159  	#endif
160  	}
161  	
162  	#define mem_int_put_ref(myself) _mem_int_put_ref(myself, __func__, __LINE__)
163  	/**
164  	 * @brief Put a ref for a handle
165  	 *
166  	 * If this is the last ref, clean up and free the handle
167  	 *
168  	 * @param[in] myself	Handle to ref
169  	 * @param[in] func	Function getting ref
170  	 * @param[in] line	Line getting ref
171  	 */
172  	static void _mem_int_put_ref(struct mem_fsal_obj_handle *myself,
173  				     const char *func, int line)
174  	{
175  		int32_t refcount = atomic_dec_int32_t(&myself->refcount);
176  	
177  	#ifdef USE_LTTNG
178  		tracepoint(fsalmem, mem_put_ref, func, line, &myself->obj_handle,
179  			   myself->m_name, refcount);
180  	#endif
181  	
182  		if (refcount == 0) {
183  			mem_cleanup(myself);
184  		}
185  	}
186  	
187  	/**
188  	 * @brief Construct the fs opaque part of a mem nfsv4 handle
189  	 *
190  	 * Given the components of a mem nfsv4 handle, the nfsv4 handle is
191  	 * created by concatenating the components. This is the fs opaque piece
192  	 * of struct file_handle_v4 and what is sent over the wire.
193  	 *
194  	 * @param[in] myself	Obj to create handle for
195  	 *
196  	 * @return The nfsv4 mem file handle as a char *
197  	 */
198  	static void package_mem_handle(struct mem_fsal_obj_handle *myself)
199  	{
200  		char buf[MAXPATHLEN];
201  		uint16_t len;
202  		uint64_t hashkey;
203  		int opaque_bytes_used = 0, pathlen = 0;
204  	
205  		memset(buf, 0, sizeof(buf));
206  	
207  		/* Make hashkey */
208  		len = sizeof(myself->obj_handle.fileid);
209  		memcpy(buf, &myself->obj_handle.fileid, len);
210  		strncpy(buf + len, myself->m_name, sizeof(buf) - len);
211  		hashkey = CityHash64(buf, sizeof(buf));
212  	
213  		memcpy(myself->handle, &hashkey, sizeof(hashkey));
214  		opaque_bytes_used += sizeof(hashkey);
215  	
216  		/* include length of the name in the handle.
217  		 * MAXPATHLEN=4096 ... max path length can be contained in a short int.
218  		 */
219  		len = strlen(myself->m_name);
220  		memcpy(myself->handle + opaque_bytes_used, &len, sizeof(len));
221  		opaque_bytes_used += sizeof(len);
222  	
223  		/* Either the nfsv4 fh opaque size or the length of the name.
224  		 * Ideally we can include entire mem name for guaranteed
225  		 * uniqueness of mem handles.
226  		 */
227  		pathlen = MIN(V4_FH_OPAQUE_SIZE - opaque_bytes_used, len);
228  		memcpy(myself->handle + opaque_bytes_used, myself->m_name, pathlen);
229  		opaque_bytes_used += pathlen;
230  	
231  		/* If there is more space in the opaque handle due to a short mem
232  		 * path ... zero it.
233  		 */
234  		if (opaque_bytes_used < V4_FH_OPAQUE_SIZE) {
235  			memset(myself->handle + opaque_bytes_used, 0,
236  			       V4_FH_OPAQUE_SIZE - opaque_bytes_used);
237  		}
238  	}
239  	
240  	/**
241  	 * @brief Insert an obj into it's parent's tree
242  	 *
243  	 * @param[in] parent	Parent directory
244  	 * @param[in] child	Child to insert.
245  	 * @param[in] name	Name to use for insertion
246  	 */
247  	static void mem_insert_obj(struct mem_fsal_obj_handle *parent,
248  				   struct mem_fsal_obj_handle *child,
249  				   const char *name)
250  	{
251  		struct mem_dirent *dirent;
252  		uint32_t numkids;
253  	
254  		dirent = gsh_calloc(1, sizeof(*dirent));
255  		dirent->hdl = child;
256  		mem_int_get_ref(child);
257  		dirent->dir = parent;
258  		dirent->d_name = gsh_strdup(name);
259  		/* Index is hash of the name */
260  		dirent->d_index = CityHash64(name, strlen(name));
261  	
262  		/* Link into child */
263  		PTHREAD_RWLOCK_wrlock(&child->obj_handle.obj_lock);
264  		glist_add_tail(&child->dirents, &dirent->dlist);
265  		PTHREAD_RWLOCK_unlock(&child->obj_handle.obj_lock);
266  	
267  		/* Link into parent */
268  		PTHREAD_RWLOCK_wrlock(&parent->obj_handle.obj_lock);
269  		/* Name tree */
270  		avltree_insert(&dirent->avl_n, &parent->mh_dir.avl_name);
271  		/* Index tree */
272  		avltree_insert(&dirent->avl_i, &parent->mh_dir.avl_index);
273  		/* Update numkids */
274  		numkids = atomic_inc_uint32_t(&parent->mh_dir.numkids);
275  		LogFullDebug(COMPONENT_FSAL, "%s numkids %"PRIu32, parent->m_name,
276  			     numkids);
277  	
278  		PTHREAD_RWLOCK_unlock(&parent->obj_handle.obj_lock);
279  	}
280  	
281  	/**
282  	 * @brief Find the dirent pointing to a name in a directory
283  	 *
284  	 * @param[in] dir	Directory to search
285  	 * @param[in] name	Name to look up
286  	 * @return Dirent on success, NULL on failure
287  	 */
288  	struct mem_dirent *
289  	mem_dirent_lookup(struct mem_fsal_obj_handle *dir, const char *name)
290  	{
291  		struct mem_dirent key;
292  		struct avltree_node *node;
293  	
294  		key.d_name = name;
295  	
296  		node = avltree_lookup(&key.avl_n, &dir->mh_dir.avl_name);
297  		if (!node) {
298  			/* it's not there */
299  			return NULL;
300  		}
301  	
302  		return avltree_container_of(node, struct mem_dirent, avl_n);
303  	}
304  	
305  	/**
306  	 * @brief Get the next dirent in a directory
307  	 *
308  	 * @note Caller must hold the obj_lock on the obj
309  	 *
310  	 * @param[in] dirent	Current dirent
311  	 * @return Next dirent, or NULL if at EOD
312  	 */
313  	static struct mem_dirent *
314  	mem_dirent_next(struct mem_dirent *dirent)
315  	{
316  		struct avltree_node *node;
317  	
318  		node = avltree_next(&dirent->avl_i);
319  		if (!node) {
320  			return NULL;
321  		}
322  	
323  		return avltree_container_of(node, struct mem_dirent, avl_i);
324  	}
325  	
326  	/**
327  	 * @brief Seek to a location in a directory
328  	 *
329  	 * Handle normal vs whence-is-name directories.
330  	 *
331  	 * @note Caller must hold the obj_lock on the obj
332  	 *
333  	 * @param[in] dir	Directory to seek in
334  	 * @param[in] seekloc	Location to seek to
335  	 * @return Dirent associated with seekloc
336  	 */
337  	static struct mem_dirent *
338  	mem_readdir_seekloc(struct mem_fsal_obj_handle *dir, fsal_cookie_t seekloc)
339  	{
340  		struct mem_dirent *dirent;
341  		struct avltree_node *node;
342  		struct mem_dirent key;
343  	
344  		if (!seekloc) {
345  			/* Start from the beginning.  We walk the index tree, so always
346  			 * grab from the index tree. */
347  			node = avltree_first(&dir->mh_dir.avl_index);
348  			if (!node) {
349  				return NULL;
350  			}
351  			dirent = avltree_container_of(node, struct mem_dirent,
352  						      avl_i);
353  			return dirent;
354  		}
355  	
356  	
357  		key.d_index = seekloc;
358  		node = avltree_lookup(&key.avl_i, &dir->mh_dir.avl_index);
359  		if (!node) {
360  			/* Dirent was probably deleted.  Find the next one */
361  			node = avltree_sup(&key.avl_i, &dir->mh_dir.avl_index);
362  		}
363  		if (!node) {
364  			/* Done */
365  			return NULL;
366  		}
367  	
368  		dirent = avltree_container_of(node, struct mem_dirent, avl_i);
369  	
370  		return dirent;
371  	}
372  	
373  	/**
374  	 * @brief Update the change attribute of the FSAL object
375  	 *
376  	 * @note Caller must hold the obj_lock on the obj
377  	 *
378  	 * @param[in] obj	FSAL obj which was modified
379  	 */
380  	static void mem_update_change_locked(struct mem_fsal_obj_handle *obj)
381  	{
382  		now(&obj->attrs.mtime);
383  		obj->attrs.ctime = obj->attrs.mtime;
384  		obj->attrs.change = timespec_to_nsecs(&obj->attrs.mtime);
385  	}
386  	
387  	/**
388  	 * @brief Remove an obj from it's parent's tree
389  	 *
390  	 * @note Caller must hold the obj_lock on the parent
391  	 *
392  	 * @param[in] parent	Parent directory
393  	 * @param[in] dirent	Dirent to remove
394  	 * @param[in] release	If true and no more dirents, release child
395  	 */
396  	static void mem_remove_dirent_locked(struct mem_fsal_obj_handle *parent,
397  					     struct mem_dirent *dirent)
398  	{
399  		struct mem_fsal_obj_handle *child;
400  		uint32_t numkids;
401  	
402  		avltree_remove(&dirent->avl_n, &parent->mh_dir.avl_name);
403  		avltree_remove(&dirent->avl_i, &parent->mh_dir.avl_index);
404  	
405  		/* Take the child lock, to remove from the child.  This should not race
406  		 * with @r mem_insert_obj since that takes the locks seqentially */
407  		child = dirent->hdl;
408  		PTHREAD_RWLOCK_wrlock(&child->obj_handle.obj_lock);
409  		glist_del(&dirent->dlist);
410  		PTHREAD_RWLOCK_unlock(&child->obj_handle.obj_lock);
411  	
412  		numkids = atomic_dec_uint32_t(&parent->mh_dir.numkids);
413  		LogFullDebug(COMPONENT_FSAL, "%s numkids %"PRIu32, parent->m_name,
414  			     numkids);
415  	
416  		/* Free dirent */
417  		gsh_free((char *)dirent->d_name);
418  		gsh_free(dirent);
419  	
420  		mem_int_put_ref(child);
421  	
422  		mem_update_change_locked(parent);
423  	}
424  	
425  	/**
426  	 * @brief Remove a dirent from it's parent's tree
427  	 *
428  	 * @param[in] parent	Parent directory
429  	 * @param[in] name	Name to remove
430  	 */
431  	static void mem_remove_dirent(struct mem_fsal_obj_handle *parent,
432  				      const char *name)
433  	{
434  		struct mem_dirent *dirent;
435  	
436  		PTHREAD_RWLOCK_wrlock(&parent->obj_handle.obj_lock);
437  	
438  		dirent = mem_dirent_lookup(parent, name);
439  		if (dirent)
440  			mem_remove_dirent_locked(parent, dirent);
441  	
442  		PTHREAD_RWLOCK_unlock(&parent->obj_handle.obj_lock);
443  	}
444  	
445  	/**
446  	 * @brief Recursively clean all objs/dirents on an export
447  	 *
448  	 * @note Caller MUST hold export lock for write
449  	 *
450  	 * @param[in] root	Root to clean
451  	 * @return Return description
452  	 */
453  	void mem_clean_export(struct mem_fsal_obj_handle *root)
454  	{
455  		struct mem_fsal_obj_handle *child;
456  		struct avltree_node *node;
457  		struct mem_dirent *dirent;
458  	
459  	#ifdef USE_LTTNG
460  		tracepoint(fsalmem, mem_inuse, __func__, __LINE__, &root->obj_handle,
461  			   root->attrs.numlinks, root->is_export);
462  	#endif
463  		while ((node = avltree_first(&root->mh_dir.avl_name))) {
464  			dirent = avltree_container_of(node, struct mem_dirent, avl_n);
465  	
466  			child = dirent->hdl;
467  			if (child->obj_handle.type == DIRECTORY) {
468  				mem_clean_export(child);
469  			}
470  	
471  			PTHREAD_RWLOCK_wrlock(&root->obj_handle.obj_lock);
472  			mem_remove_dirent_locked(root, dirent);
473  			PTHREAD_RWLOCK_unlock(&root->obj_handle.obj_lock);
474  		}
475  	
476  	}
477  	
478  	/**
479  	 * @brief Remove all children from a directory's tree
480  	 *
481  	 * @param[in] parent	Directroy to clean
482  	 */
483  	void mem_clean_all_dirents(struct mem_fsal_obj_handle *parent)
484  	{
485  		struct avltree_node *node;
486  		struct mem_dirent *dirent;
487  	
488  		PTHREAD_RWLOCK_wrlock(&parent->obj_handle.obj_lock);
489  	
490  		while ((node = avltree_first(&parent->mh_dir.avl_name))) {
491  			dirent = avltree_container_of(node, struct mem_dirent, avl_n);
492  			mem_remove_dirent_locked(parent, dirent);
493  		}
494  	
495  		PTHREAD_RWLOCK_unlock(&parent->obj_handle.obj_lock);
496  	}
497  	
498  	static void mem_copy_attrs_mask(struct attrlist *attrs_in,
499  					struct attrlist *attrs_out)
500  	{
501  		/* Use full timer resolution */
502  		now(&attrs_out->ctime);
503  	
504  		if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_SIZE)) {
505  			attrs_out->filesize = attrs_in->filesize;
506  		}
507  	
508  		if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_MODE)) {
509  			attrs_out->mode = attrs_in->mode & (~S_IFMT & 0xFFFF) &
510  				~op_ctx->fsal_export->exp_ops.fs_umask(
511  							op_ctx->fsal_export);
512  		}
513  	
514  		if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_OWNER)) {
515  			attrs_out->owner = attrs_in->owner;
516  		}
517  	
518  		if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_GROUP)) {
519  			attrs_out->group = attrs_in->group;
520  		}
521  	
522  		if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTRS_SET_TIME)) {
523  			if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_ATIME_SERVER)) {
524  				attrs_out->atime.tv_sec = 0;
525  				attrs_out->atime.tv_nsec = UTIME_NOW;
526  			} else if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_ATIME)) {
527  				attrs_out->atime = attrs_in->atime;
528  			} else {
529  				attrs_out->atime = attrs_out->ctime;
530  			}
531  	
532  			if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_MTIME_SERVER)) {
533  				attrs_out->mtime.tv_sec = 0;
534  				attrs_out->mtime.tv_nsec = UTIME_NOW;
535  			} else if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_MTIME)) {
536  				attrs_out->mtime = attrs_in->mtime;
537  			} else {
538  				attrs_out->mtime = attrs_out->ctime;
539  			}
540  		}
541  	
542  		if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_CREATION)) {
543  			attrs_out->creation = attrs_in->creation;
544  		}
545  	
546  		if (FSAL_TEST_MASK(attrs_in->valid_mask, ATTR_SPACEUSED)) {
547  			attrs_out->spaceused = attrs_in->spaceused;
548  		} else {
549  			attrs_out->spaceused = attrs_out->filesize;
550  		}
551  	
552  		/* XXX TODO copy ACL */
553  	
554  		/** @todo FSF - this calculation may be different than what particular
555  		 *              FSALs use. Is that a problem?
556  		 */
557  		attrs_out->change = timespec_to_nsecs(&attrs_out->ctime);
558  	}
559  	
560  	/**
561  	 * @brief Open an FD
562  	 *
563  	 * @param[in] fd	FD to close
564  	 * @return FSAL status
565  	 */
566  	static fsal_status_t mem_open_my_fd(struct fsal_fd *fd,
567  					    fsal_openflags_t openflags)
568  	{
569  		fd->openflags = FSAL_O_NFS_FLAGS(openflags);
570  	
571  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
572  	}
573  	
574  	/**
575  	 * @brief Close an FD
576  	 *
577  	 * @param[in] fd	FD to close
578  	 * @return FSAL status
579  	 */
580  	static fsal_status_t mem_close_my_fd(struct fsal_fd *fd)
581  	{
582  		if (fd->openflags == FSAL_O_CLOSED)
583  			return fsalstat(ERR_FSAL_NOT_OPENED, 0);
584  	
585  		fd->openflags = FSAL_O_CLOSED;
586  	
587  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
588  	}
589  	
590  	/**
591  	 * @brief Function to open an fsal_obj_handle's global file descriptor.
592  	 *
593  	 * @param[in]  obj_hdl     File on which to operate
594  	 * @param[in]  openflags   Mode for open
595  	 * @param[out] fd          File descriptor that is to be used
596  	 *
597  	 * @return FSAL status.
598  	 */
599  	
600  	static fsal_status_t mem_open_func(struct fsal_obj_handle *obj_hdl,
601  					   fsal_openflags_t openflags,
602  					   struct fsal_fd *fd)
603  	{
604  		return mem_open_my_fd(fd, openflags);
605  	}
606  	
607  	/**
608  	 * @brief Close a global FD
609  	 *
610  	 * @param[in] obj_hdl	Object owning FD to close
611  	 * @param[in] fd	FD to close
612  	 * @return FSAL status
613  	 */
614  	static fsal_status_t mem_close_func(struct fsal_obj_handle *obj_hdl,
615  					    struct fsal_fd *fd)
616  	{
617  		return mem_close_my_fd(fd);
618  	}
619  	
620  	#define mem_alloc_handle(p, n, t, e, a) \
621  		_mem_alloc_handle(p, n, t, e, a, __func__, __LINE__)
622  	/**
623  	 * @brief Allocate a MEM handle
624  	 *
625  	 * @param[in] parent	Parent directory handle
626  	 * @param[in] name	Name of handle to allocate
627  	 * @param[in] type	Type of handle to allocate
628  	 * @param[in] mfe	MEM Export owning new handle
629  	 * @param[in] attrs	Attributes of new handle
630  	 * @return Handle on success, NULL on failure
631  	 */
632  	static struct mem_fsal_obj_handle *
633  	_mem_alloc_handle(struct mem_fsal_obj_handle *parent,
634  			  const char *name,
635  			  object_file_type_t type,
636  			  struct mem_fsal_export *mfe,
637  			  struct attrlist *attrs,
638  			  const char *func, int line)
639  	{
640  		struct mem_fsal_obj_handle *hdl;
641  		size_t isize;
642  	
643  		isize = sizeof(struct mem_fsal_obj_handle);
644  		if (type == REGULAR_FILE) {
645  			/* Regular files need space to read/write */
646  			isize += MEM.inode_size;
647  		}
648  	
649  		hdl = gsh_calloc(1, isize);
650  	
651  		/* Establish tree details for this directory */
652  		hdl->m_name = gsh_strdup(name);
653  		hdl->obj_handle.fileid = atomic_postinc_uint64_t(&mem_inode_number);
654  		hdl->datasize = MEM.inode_size;
655  		glist_init(&hdl->dirents);
656  		PTHREAD_RWLOCK_wrlock(&mfe->mfe_exp_lock);
657  		glist_add_tail(&mfe->mfe_objs, &hdl->mfo_exp_entry);
658  		hdl->mfo_exp = mfe;
659  		PTHREAD_RWLOCK_unlock(&mfe->mfe_exp_lock);
660  		package_mem_handle(hdl);
661  	
662  		/* Fills the output struct */
663  		hdl->obj_handle.type = type;
664  		hdl->attrs.type = hdl->obj_handle.type;
665  	
666  		/* Need an FSID */
667  		hdl->obj_handle.fsid.major = op_ctx->ctx_export->export_id;
668  		hdl->obj_handle.fsid.minor = 0;
669  		hdl->attrs.fsid.major = hdl->obj_handle.fsid.major;
670  		hdl->attrs.fsid.minor = hdl->obj_handle.fsid.minor;
671  		hdl->attrs.fileid = hdl->obj_handle.fileid;
672  	
673  		if ((attrs && attrs->valid_mask & ATTR_MODE) != 0)
674  			hdl->attrs.mode = attrs->mode & (~S_IFMT & 0xFFFF) &
675  				~op_ctx->fsal_export->exp_ops.fs_umask(
676  							op_ctx->fsal_export);
677  		else
678  			hdl->attrs.mode = 0600;
679  	
680  		if ((attrs && attrs->valid_mask & ATTR_OWNER) != 0)
681  			hdl->attrs.owner = attrs->owner;
682  		else
683  			hdl->attrs.owner = op_ctx->creds->caller_uid;
684  	
685  		if ((attrs && attrs->valid_mask & ATTR_GROUP) != 0)
686  			hdl->attrs.group = attrs->group;
687  		else
688  			hdl->attrs.group = op_ctx->creds->caller_gid;
689  	
690  		/* Use full timer resolution */
691  		now(&hdl->attrs.ctime);
692  	
693  		if ((attrs && attrs->valid_mask & ATTR_ATIME) != 0)
694  			hdl->attrs.atime = attrs->atime;
695  		else
696  			hdl->attrs.atime = hdl->attrs.ctime;
697  	
698  		if ((attrs && attrs->valid_mask & ATTR_MTIME) != 0)
699  			hdl->attrs.mtime = attrs->mtime;
700  		else
701  			hdl->attrs.mtime = hdl->attrs.ctime;
702  	
703  	
704  		/** @todo FSF - this calculation may be different than what particular
705  		 *              FSALs use. Is that a problem?
706  		 */
707  		hdl->attrs.change =
708  			timespec_to_nsecs(&hdl->attrs.ctime);
709  	
710  		switch (type) {
711  		case REGULAR_FILE:
712  			if ((attrs && attrs->valid_mask & ATTR_SIZE) != 0) {
713  				hdl->attrs.filesize = attrs->filesize;
714  				hdl->attrs.spaceused = attrs->filesize;
715  			} else {
716  				hdl->attrs.filesize = 0;
717  				hdl->attrs.spaceused = 0;
718  			}
719  			hdl->attrs.numlinks = 1;
720  			break;
721  		case BLOCK_FILE:
722  		case CHARACTER_FILE:
723  			if ((attrs && attrs->valid_mask & ATTR_RAWDEV) != 0) {
724  				hdl->attrs.rawdev.major = attrs->rawdev.major;
725  				hdl->attrs.rawdev.minor = attrs->rawdev.minor;
726  			} else {
727  				hdl->attrs.rawdev.major = 0;
728  				hdl->attrs.rawdev.minor = 0;
729  			}
730  			hdl->attrs.numlinks = 1;
731  			break;
732  		case DIRECTORY:
733  			avltree_init(&hdl->mh_dir.avl_name, mem_n_cmpf, 0);
734  			avltree_init(&hdl->mh_dir.avl_index, mem_i_cmpf, 0);
735  			hdl->attrs.numlinks = 2;
736  			hdl->mh_dir.numkids = 2;
737  			hdl->mh_dir.parent = parent;
738  			break;
739  		default:
740  			hdl->attrs.numlinks = 1;
741  			break;
742  		}
743  	
744  	
745  	
746  		/* Set the mask at the end. */
747  		hdl->attrs.valid_mask = ATTRS_POSIX;
748  		hdl->attrs.supported = ATTRS_POSIX;
749  	
750  		/* Initial ref */
751  		hdl->refcount = 1;
752  	#ifdef USE_LTTNG
753  		tracepoint(fsalmem, mem_alloc, func, line, &hdl->obj_handle, name,
754  			   hdl->refcount);
755  	#endif
756  	
757  		fsal_obj_handle_init(&hdl->obj_handle, &mfe->export, type);
758  		hdl->obj_handle.obj_ops = &MEM.handle_ops;
759  	
760  		if (parent != NULL) {
761  			/* Attach myself to my parent */
762  			mem_insert_obj(parent, hdl, name);
763  		} else {
764  			/* This is an export */
765  			hdl->is_export = true;
766  		}
767  	
768  		return hdl;
769  	}
770  	
771  	#define  mem_int_lookup(d, p, e) _mem_int_lookup(d, p, e, __func__, __LINE__)
772  	static fsal_status_t _mem_int_lookup(struct mem_fsal_obj_handle *dir,
773  					     const char *path,
774  					     struct mem_fsal_obj_handle **entry,
775  					     const char *func, int line)
776  	{
777  		struct mem_dirent *dirent;
778  	
779  		*entry = NULL;
780  		LogFullDebug(COMPONENT_FSAL, "Lookup %s in %p", path, dir);
781  	
782  	#ifdef USE_LTTNG
783  		tracepoint(fsalmem, mem_lookup, func, line, &dir->obj_handle, path);
784  	#endif
785  		if (strcmp(path, "..") == 0) {
786  			/* lookup parent - lookupp */
787  			if (dir->mh_dir.parent == NULL) {
788  				return fsalstat(ERR_FSAL_NOENT, 0);
789  			}
790  	
791  			*entry = dir->mh_dir.parent;
792  			LogFullDebug(COMPONENT_FSAL,
793  				     "Found %s/%s hdl=%p",
794  				     dir->m_name, path, *entry);
795  			return fsalstat(ERR_FSAL_NO_ERROR, 0);
796  		} else if (strcmp(path, ".") == 0) {
797  			*entry = dir;
798  			return fsalstat(ERR_FSAL_NO_ERROR, 0);
799  		}
800  	
801  		dirent = mem_dirent_lookup(dir, path);
802  		if (!dirent) {
803  			return fsalstat(ERR_FSAL_NOENT, 0);
804  		}
805  		*entry = dirent->hdl;
806  	
807  	#ifdef USE_LTTNG
808  		tracepoint(fsalmem, mem_lookup, func, line, &(*entry)->obj_handle,
809  			   (*entry)->m_name);
810  	#endif
811  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
812  	}
813  	
814  	static fsal_status_t mem_create_obj(struct mem_fsal_obj_handle *parent,
815  					    object_file_type_t type,
816  					    const char *name,
817  					    struct attrlist *attrs_in,
818  					    struct fsal_obj_handle **new_obj,
819  					    struct attrlist *attrs_out)
820  	{
821  		struct mem_fsal_export *mfe = container_of(op_ctx->fsal_export,
822  							   struct mem_fsal_export,
823  							   export);
824  		struct mem_fsal_obj_handle *hdl;
825  		fsal_status_t status;
826  	
827  		*new_obj = NULL;		/* poison it */
828  	
829  		if (parent->obj_handle.type != DIRECTORY) {
830  			LogCrit(COMPONENT_FSAL,
831  				"Parent handle is not a directory. hdl = 0x%p",
832  				parent);
833  			return fsalstat(ERR_FSAL_NOTDIR, 0);
834  		}
835  	
836  		status = mem_int_lookup(parent, name, &hdl);
837  		if (!FSAL_IS_ERROR(status)) {
838  			/* It already exists */
839  			return fsalstat(ERR_FSAL_EXIST, 0);
840  		} else if (status.major != ERR_FSAL_NOENT) {
841  			/* Some other error */
842  			return status;
843  		}
844  	
845  		/* allocate an obj_handle and fill it up */
846  		hdl = mem_alloc_handle(parent,
847  				       name,
848  				       type,
849  				       mfe,
850  				       attrs_in);
851  		if (!hdl)
852  			return fsalstat(ERR_FSAL_NOMEM, 0);
853  	
854  		*new_obj = &hdl->obj_handle;
855  	
856  		if (attrs_out != NULL)
857  			fsal_copy_attrs(attrs_out, &hdl->attrs, false);
858  	
859  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
860  	}
861  	
862  	/* handle methods
863  	 */
864  	
865  	/**
866  	 * @brief Lookup a file
867  	 *
868  	 * @param[in] parent	Parent directory
869  	 * @param[in] path	Path to lookup
870  	 * @param[out] handle	Found handle, on success
871  	 * @param[out] attrs_out	Attributes of found handle
872  	 * @return FSAL status
873  	 */
874  	static fsal_status_t mem_lookup(struct fsal_obj_handle *parent,
875  					const char *path,
876  					struct fsal_obj_handle **handle,
877  					struct attrlist *attrs_out)
878  	{
879  		struct mem_fsal_obj_handle *myself, *hdl = NULL;
880  		fsal_status_t status;
881  	
882  		myself = container_of(parent,
883  				      struct mem_fsal_obj_handle,
884  				      obj_handle);
885  	
886  		/* Check if this context already holds the lock on
887  		 * this directory.
888  		 */
889  		if (op_ctx->fsal_private != parent)
890  			PTHREAD_RWLOCK_rdlock(&parent->obj_lock);
891  		else
892  			LogFullDebug(COMPONENT_FSAL,
893  				     "Skipping lock for %s",
894  				     myself->m_name);
895  	
896  		status = mem_int_lookup(myself, path, &hdl);
897  		if (FSAL_IS_ERROR(status)) {
898  			goto out;
899  		}
900  	
901  		*handle = &hdl->obj_handle;
902  		mem_int_get_ref(hdl);
903  	
904  	out:
905  		if (op_ctx->fsal_private != parent)
906  			PTHREAD_RWLOCK_unlock(&parent->obj_lock);
907  	
908  		if (!FSAL_IS_ERROR(status) && attrs_out != NULL) {
909  			/* This is unlocked, however, for the most part, attributes
910  			 * are read-only. Come back later and do some lock protection.
911  			 */
912  			fsal_copy_attrs(attrs_out, &hdl->attrs, false);
913  		}
914  	
915  		return status;
916  	}
917  	
918  	/**
919  	 * @brief Read a directory
920  	 *
921  	 * @param[in] dir_hdl	the directory to read
922  	 * @param[in] whence	where to start (next)
923  	 * @param[in] dir_state	pass thru of state to callback
924  	 * @param[in] cb	callback function
925  	 * @param[out] eof	eof marker true == end of dir
926  	 */
927  	
928  	static fsal_status_t mem_readdir(struct fsal_obj_handle *dir_hdl,
929  					 fsal_cookie_t *whence,
930  					 void *dir_state,
931  					 fsal_readdir_cb cb,
932  					 attrmask_t attrmask,
933  					 bool *eof)
934  	{
935  		struct mem_fsal_obj_handle *myself;
936  		struct mem_dirent *dirent, *dirent_next;
937  		fsal_cookie_t cookie = 0;
938  		struct attrlist attrs;
939  		enum fsal_dir_result cb_rc;
940  		int count = 0;
941  	
942  		myself = container_of(dir_hdl,
943  				      struct mem_fsal_obj_handle,
944  				      obj_handle);
945  	
946  		if (whence != NULL)
947  			cookie = *whence;
948  	
949  		*eof = true;
950  	
951  	#ifdef USE_LTTNG
952  		tracepoint(fsalmem, mem_readdir, __func__, __LINE__, dir_hdl,
953  			   myself->m_name, cookie);
954  	#endif
955  		LogFullDebug(COMPONENT_FSAL, "hdl=%p, name=%s",
956  			     myself, myself->m_name);
957  	
958  		PTHREAD_RWLOCK_rdlock(&dir_hdl->obj_lock);
959  	
960  		/* Use fsal_private to signal to lookup that we hold
961  		 * the lock.
962  		 */
963  		op_ctx->fsal_private = dir_hdl;
964  	
965  		dirent = mem_readdir_seekloc(myself, cookie);
966  	
967  		/* Always run in index order */
968  		for (;
969  		     dirent != NULL;
970  		     dirent = dirent_next) {
971  	
972  			if (count >= 2 * mdcache_param.dir.avl_chunk) {
973  				LogFullDebug(COMPONENT_FSAL, "readahead done %d",
974  					     count);
975  				/* Limit readahead to 1 chunk */
976  				*eof = false;
977  				break;
978  			}
979  	
980  			dirent_next = mem_dirent_next(dirent);
981  			if (dirent_next) {
982  				cookie = dirent_next->d_index;
983  			} else {
984  				cookie = UINT64_MAX;
985  			}
986  	
987  			fsal_prepare_attrs(&attrs, attrmask);
988  			fsal_copy_attrs(&attrs, &dirent->hdl->attrs, false);
989  			mem_int_get_ref(dirent->hdl);
990  	
991  			cb_rc = cb(dirent->d_name, &dirent->hdl->obj_handle, &attrs,
992  				   dir_state, cookie);
993  	
994  			fsal_release_attrs(&attrs);
995  	
996  			count++;
997  	
998  			if (cb_rc >= DIR_TERMINATE) {
999  				*eof = false;
1000 				break;
1001 			}
1002 		}
1003 	
1004 		op_ctx->fsal_private = NULL;
1005 	
1006 		PTHREAD_RWLOCK_unlock(&dir_hdl->obj_lock);
1007 	
1008 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1009 	}
1010 	
1011 	/**
1012 	 * @brief Create a directory
1013 	 *
1014 	 * This function creates a new directory.
1015 	 *
1016 	 * While FSAL_MEM is a support_ex FSAL, it doesn't actually support
1017 	 * setting attributes, so only the mode attribute is relevant. Any other
1018 	 * attributes set on creation will be ignored. The owner and group will be
1019 	 * set from the active credentials.
1020 	 *
1021 	 * @param[in]     dir_hdl   Directory in which to create the directory
1022 	 * @param[in]     name      Name of directory to create
1023 	 * @param[in]     attrs_in  Attributes to set on newly created object
1024 	 * @param[out]    new_obj   Newly created object
1025 	 * @param[in,out] attrs_out Optional attributes for newly created object
1026 	 *
1027 	 * @note On success, @a new_obj has been ref'd
1028 	 *
1029 	 * @return FSAL status.
1030 	 */
1031 	static fsal_status_t mem_mkdir(struct fsal_obj_handle *dir_hdl,
1032 				       const char *name,
1033 				       struct attrlist *attrs_in,
1034 				       struct fsal_obj_handle **new_obj,
1035 				       struct attrlist *attrs_out)
1036 	{
1037 		struct mem_fsal_obj_handle *parent =
1038 			container_of(dir_hdl, struct mem_fsal_obj_handle, obj_handle);
1039 	
1040 		LogDebug(COMPONENT_FSAL, "mkdir %s", name);
1041 	
1042 	#ifdef USE_LTTNG
1043 		tracepoint(fsalmem, mem_mkdir, __func__, __LINE__, dir_hdl,
1044 			   parent->m_name, name);
1045 	#endif
1046 	
1047 		return mem_create_obj(parent, DIRECTORY, name, attrs_in, new_obj,
1048 				      attrs_out);
1049 	}
1050 	
1051 	/**
1052 	 * @brief Make a device node
1053 	 *
1054 	 * @param[in] dir_hdl	Parent directory handle
1055 	 * @param[in] name	Name of new node
1056 	 * @param[in] nodetype	Type of new node
1057 	 * @param[in] attrs_in	Attributes for new node
1058 	 * @param[out] new_obj	New object handle on success
1059 	 *
1060 	 * @note This returns an INITIAL ref'd entry on success
1061 	 * @return FSAL status
1062 	 */
1063 	static fsal_status_t mem_mknode(struct fsal_obj_handle *dir_hdl,
1064 					const char *name, object_file_type_t nodetype,
1065 					struct attrlist *attrs_in,
1066 					struct fsal_obj_handle **new_obj,
1067 					struct attrlist *attrs_out)
1068 	{
1069 		struct mem_fsal_obj_handle *hdl, *parent =
1070 			container_of(dir_hdl, struct mem_fsal_obj_handle, obj_handle);
1071 		fsal_status_t status;
1072 	
1073 		LogDebug(COMPONENT_FSAL, "mknode %s", name);
1074 	
1075 		status = mem_create_obj(parent, nodetype, name, attrs_in, new_obj,
1076 					attrs_out);
1077 		if (unlikely(FSAL_IS_ERROR(status)))
1078 			return status;
1079 	
1080 		hdl = container_of(*new_obj, struct mem_fsal_obj_handle, obj_handle);
1081 	
1082 		hdl->mh_node.nodetype = nodetype;
1083 		hdl->mh_node.dev = attrs_in->rawdev; /* struct copy */
1084 	
1085 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1086 	}
1087 	
1088 	/**
1089 	 * @brief Make a symlink
1090 	 *
1091 	 * @param[in] dir_hdl	Parent directory handle
1092 	 * @param[in] name	Name of new node
1093 	 * @param[in] link_path	Contents of symlink
1094 	 * @param[in] attrs_in	Attributes for new simlink
1095 	 * @param[out] new_obj	New object handle on success
1096 	 *
1097 	 * @note This returns an INITIAL ref'd entry on success
1098 	 * @return FSAL status
1099 	 */
1100 	static fsal_status_t mem_symlink(struct fsal_obj_handle *dir_hdl,
1101 					 const char *name, const char *link_path,
1102 					 struct attrlist *attrs_in,
1103 					 struct fsal_obj_handle **new_obj,
1104 					 struct attrlist *attrs_out)
1105 	{
1106 		struct mem_fsal_obj_handle *hdl, *parent =
1107 			container_of(dir_hdl, struct mem_fsal_obj_handle, obj_handle);
1108 		fsal_status_t status;
1109 	
1110 		LogDebug(COMPONENT_FSAL, "symlink %s", name);
1111 	
1112 		status = mem_create_obj(parent, SYMBOLIC_LINK, name, attrs_in, new_obj,
1113 					attrs_out);
1114 		if (unlikely(FSAL_IS_ERROR(status)))
1115 			return status;
1116 	
1117 		hdl = container_of(*new_obj, struct mem_fsal_obj_handle, obj_handle);
1118 	
1119 		hdl->mh_symlink.link_contents = gsh_strdup(link_path);
1120 	
1121 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1122 	}
1123 	
1124 	/**
1125 	 * @brief Read a symlink
1126 	 *
1127 	 * @param[in] obj_hdl	Handle for symlink
1128 	 * @param[out] link_content	Buffer to fill with link contents
1129 	 * @param[in] refresh	If true, refresh attributes on symlink
1130 	 * @return FSAL status
1131 	 */
1132 	static fsal_status_t mem_readlink(struct fsal_obj_handle *obj_hdl,
1133 					  struct gsh_buffdesc *link_content,
1134 					  bool refresh)
1135 	{
1136 		struct mem_fsal_obj_handle *myself =
1137 			container_of(obj_hdl, struct mem_fsal_obj_handle, obj_handle);
1138 	
1139 		if (obj_hdl->type != SYMBOLIC_LINK) {
1140 			LogCrit(COMPONENT_FSAL,
1141 				"Handle is not a symlink. hdl = 0x%p",
1142 				obj_hdl);
1143 			return fsalstat(ERR_FSAL_INVAL, 0);
1144 		}
1145 	
1146 		link_content->len = strlen(myself->mh_symlink.link_contents) + 1;
1147 		link_content->addr = gsh_strdup(myself->mh_symlink.link_contents);
1148 	
1149 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1150 	}
1151 	
1152 	/**
1153 	 * @brief Get attributes for a file
1154 	 *
1155 	 * @param[in] obj_hdl	File to get
1156 	 * @param[out] outattrs	Attributes for file
1157 	 * @return FSAL status
1158 	 */
1159 	static fsal_status_t mem_getattrs(struct fsal_obj_handle *obj_hdl,
1160 					  struct attrlist *outattrs)
1161 	{
1162 		struct mem_fsal_obj_handle *myself =
1163 			container_of(obj_hdl, struct mem_fsal_obj_handle, obj_handle);
1164 	
1165 		if (!myself->is_export && glist_empty(&myself->dirents)) {
1166 			/* Removed entry - stale */
1167 			LogDebug(COMPONENT_FSAL,
1168 				 "Requesting attributes for removed entry %p, name=%s",
1169 				 myself, myself->m_name);
1170 			return fsalstat(ERR_FSAL_STALE, ESTALE);
1171 		}
1172 	
1173 		if (obj_hdl->type == DIRECTORY) {
1174 			/* We need to update the numlinks */
1175 			myself->attrs.numlinks =
1176 				atomic_fetch_uint32_t(&myself->mh_dir.numkids);
1177 		}
1178 	
1179 	#ifdef USE_LTTNG
1180 		tracepoint(fsalmem, mem_getattrs, __func__, __LINE__, obj_hdl,
1181 			   myself->m_name, myself->attrs.filesize,
1182 			   myself->attrs.numlinks, myself->attrs.change);
1183 	#endif
1184 		LogFullDebug(COMPONENT_FSAL,
1185 			     "hdl=%p, name=%s numlinks %"PRIu32,
1186 			     myself,
1187 			     myself->m_name,
1188 			     myself->attrs.numlinks);
1189 	
1190 		fsal_copy_attrs(outattrs, &myself->attrs, false);
1191 	
1192 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1193 	}
1194 	
1195 	/**
1196 	 * @brief Set attributes on an object
1197 	 *
1198 	 * This function sets attributes on an object.  Which attributes are
1199 	 * set is determined by attrs_set->valid_mask. The FSAL must manage bypass
1200 	 * or not of share reservations, and a state may be passed.
1201 	 *
1202 	 * @param[in] obj_hdl    File on which to operate
1203 	 * @param[in] state      state_t to use for this operation
1204 	 * @param[in] attrs_set Attributes to set
1205 	 *
1206 	 * @return FSAL status.
1207 	 */
1208 	fsal_status_t mem_setattr2(struct fsal_obj_handle *obj_hdl,
1209 				   bool bypass,
1210 				   struct state_t *state,
1211 				   struct attrlist *attrs_set)
1212 	{
1213 		struct mem_fsal_obj_handle *myself =
1214 			container_of(obj_hdl, struct mem_fsal_obj_handle, obj_handle);
1215 	
1216 		/* apply umask, if mode attribute is to be changed */
1217 		if (FSAL_TEST_MASK(attrs_set->valid_mask, ATTR_MODE))
1218 			attrs_set->mode &=
1219 			    ~op_ctx->fsal_export->exp_ops.fs_umask(op_ctx->fsal_export);
1220 	
1221 		/* Test if size is being set, make sure file is regular and if so,
1222 		 * require a read/write file descriptor.
1223 		 */
1224 		if (FSAL_TEST_MASK(attrs_set->valid_mask, ATTR_SIZE) &&
1225 		    obj_hdl->type != REGULAR_FILE) {
1226 			LogFullDebug(COMPONENT_FSAL,
1227 				     "Setting size on non-regular file");
1228 			return fsalstat(ERR_FSAL_INVAL, EINVAL);
1229 		}
1230 	
1231 		mem_copy_attrs_mask(attrs_set, &myself->attrs);
1232 	
1233 	#ifdef USE_LTTNG
1234 		tracepoint(fsalmem, mem_setattrs, __func__, __LINE__, obj_hdl,
1235 			   myself->m_name, myself->attrs.filesize,
1236 			   myself->attrs.numlinks, myself->attrs.change);
1237 	#endif
1238 		return fsalstat(ERR_FSAL_NO_ERROR, EINVAL);
1239 	}
1240 	
1241 	/**
1242 	 * @brief Hard link an obj
1243 	 *
1244 	 * @param[in] obj_hdl	File to link
1245 	 * @param[in] dir_hdl	Directory to link into
1246 	 * @param[in] name	Name to use for link
1247 	 *
1248 	 * @return FSAL status.
1249 	 */
1250 	fsal_status_t mem_link(struct fsal_obj_handle *obj_hdl,
1251 			       struct fsal_obj_handle *dir_hdl,
1252 			       const char *name)
1253 	{
1254 		struct mem_fsal_obj_handle *myself =
1255 			container_of(obj_hdl, struct mem_fsal_obj_handle, obj_handle);
1256 		struct mem_fsal_obj_handle *dir =
1257 			container_of(dir_hdl, struct mem_fsal_obj_handle, obj_handle);
1258 		struct mem_fsal_obj_handle *hdl;
1259 		fsal_status_t status = {0, 0};
1260 	
1261 		status = mem_int_lookup(dir, name, &hdl);
1262 		if (!FSAL_IS_ERROR(status)) {
1263 			/* It already exists */
1264 			return fsalstat(ERR_FSAL_EXIST, 0);
1265 		} else if (status.major != ERR_FSAL_NOENT) {
1266 			/* Some other error */
1267 			return status;
1268 		}
1269 	
1270 		mem_insert_obj(dir, myself, name);
1271 	
1272 		myself->attrs.numlinks++;
1273 	
1274 	#ifdef USE_LTTNG
1275 		tracepoint(fsalmem, mem_link, __func__, __LINE__, dir_hdl, dir->m_name,
1276 			   obj_hdl, myself->m_name, name, myself->attrs.numlinks);
1277 	#endif
1278 	
1279 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1280 	}
1281 	
1282 	/**
1283 	 * @brief Unlink a file
1284 	 *
1285 	 * @param[in] dir_hdl	Parent directory handle
1286 	 * @param[in] obj_hdl	Object being removed
1287 	 * @param[in] name	Name of object to remove
1288 	 * @return FSAL status
1289 	 */
1290 	static fsal_status_t mem_unlink(struct fsal_obj_handle *dir_hdl,
1291 					struct fsal_obj_handle *obj_hdl,
1292 					const char *name)
1293 	{
1294 		struct mem_fsal_obj_handle *parent, *myself;
1295 		fsal_status_t status = {0, 0};
1296 		uint32_t numkids;
1297 		struct mem_dirent *dirent;
1298 	
1299 		parent = container_of(dir_hdl,
1300 				      struct mem_fsal_obj_handle,
1301 				      obj_handle);
1302 		myself = container_of(obj_hdl,
1303 				      struct mem_fsal_obj_handle,
1304 				      obj_handle);
1305 	
1306 	#ifdef USE_LTTNG
1307 		tracepoint(fsalmem, mem_unlink, __func__, __LINE__, dir_hdl,
1308 			   parent->m_name, obj_hdl, myself->m_name,
1309 			   myself->attrs.numlinks);
1310 	#endif
1311 	
1312 		PTHREAD_RWLOCK_wrlock(&dir_hdl->obj_lock);
1313 	
1314 		switch (obj_hdl->type) {
1315 		case DIRECTORY:
1316 			/* Check if directory is empty */
1317 			numkids = atomic_fetch_uint32_t(&myself->mh_dir.numkids);
1318 			if (numkids > 2) {
1319 				LogFullDebug(COMPONENT_FSAL,
1320 					     "%s numkids %"PRIu32,
1321 					     myself->m_name, numkids);
1322 				status = fsalstat(ERR_FSAL_NOTEMPTY, 0);
1323 				goto unlock;
1324 			}
1325 	
1326 			break;
1327 		case REGULAR_FILE:
1328 			/* Openable. Make sure it's closed */
1329 			if (myself->mh_file.fd.openflags != FSAL_O_CLOSED) {
1330 				status = fsalstat(ERR_FSAL_FILE_OPEN, 0);
1331 				goto unlock;
1332 			}
1333 			/* FALLTHROUGH */
1334 		case SYMBOLIC_LINK:
1335 		case SOCKET_FILE:
1336 		case CHARACTER_FILE:
1337 		case BLOCK_FILE:
1338 		case FIFO_FILE:
1339 			/* Unopenable.  Just clean up */
1340 			myself->attrs.numlinks--;
1341 			break;
1342 		default:
1343 			break;
1344 		}
1345 	
1346 		/* Remove the dirent from the parent*/
1347 		dirent = mem_dirent_lookup(parent, name);
1348 		if (dirent) {
1349 			mem_remove_dirent_locked(parent, dirent);
1350 		}
1351 	
1352 	unlock:
1353 		PTHREAD_RWLOCK_unlock(&dir_hdl->obj_lock);
1354 	
1355 		return status;
1356 	}
1357 	
1358 	/**
1359 	 * @brief Close a file's global descriptor
1360 	 *
1361 	 * @param[in] obj_hdl    File on which to operate
1362 	 *
1363 	 * @return FSAL status.
1364 	 */
1365 	
1366 	fsal_status_t mem_close(struct fsal_obj_handle *obj_hdl)
1367 	{
1368 		struct mem_fsal_obj_handle *myself = container_of(obj_hdl,
1369 					  struct mem_fsal_obj_handle, obj_handle);
1370 		fsal_status_t status;
1371 	
1372 		assert(obj_hdl->type == REGULAR_FILE);
1373 	
1374 		/* Take write lock on object to protect file descriptor.
1375 		 * This can block over an I/O operation.
1376 		 */
1377 		PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1378 	
1379 		status = mem_close_my_fd(&myself->mh_file.fd);
1380 	
1381 		PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1382 	
1383 		return status;
1384 	}
1385 	
1386 	/**
1387 	 * @brief Rename an object
1388 	 *
1389 	 * Rename the given object from @a old_name in @a olddir_hdl to @a new_name in
1390 	 * @a newdir_hdl.  The old and new directories may be the same.
1391 	 *
1392 	 * @param[in] obj_hdl	Object to rename
1393 	 * @param[in] olddir_hdl	Directory containing @a obj_hdl
1394 	 * @param[in] old_name	Current name of @a obj_hdl
1395 	 * @param[in] newdir_hdl	Directory to move @a obj_hdl to
1396 	 * @param[in] new_name	Name to rename @a obj_hdl to
1397 	 * @return FSAL status
1398 	 */
1399 	static fsal_status_t mem_rename(struct fsal_obj_handle *obj_hdl,
1400 					struct fsal_obj_handle *olddir_hdl,
1401 					const char *old_name,
1402 					struct fsal_obj_handle *newdir_hdl,
1403 					const char *new_name)
1404 	{
1405 		struct mem_fsal_obj_handle *mem_olddir =
1406 			container_of(olddir_hdl, struct mem_fsal_obj_handle,
1407 				     obj_handle);
1408 		struct mem_fsal_obj_handle *mem_newdir =
1409 			container_of(newdir_hdl, struct mem_fsal_obj_handle,
1410 				     obj_handle);
1411 		struct mem_fsal_obj_handle *mem_obj =
1412 			container_of(obj_hdl, struct mem_fsal_obj_handle, obj_handle);
1413 		struct mem_fsal_obj_handle *mem_lookup_dst = NULL;
1414 		fsal_status_t status;
1415 	
1416 		status = mem_int_lookup(mem_newdir, new_name, &mem_lookup_dst);
1417 		if (!FSAL_IS_ERROR(status)) {
1418 			uint32_t numkids;
1419 	
1420 			if (mem_obj == mem_lookup_dst) {
1421 				/* Same source and destination */
1422 				return status;
1423 			}
1424 	
1425 			if ((obj_hdl->type == DIRECTORY &&
1426 			     mem_lookup_dst->obj_handle.type != DIRECTORY) ||
1427 			    (obj_hdl->type != DIRECTORY &&
1428 			     mem_lookup_dst->obj_handle.type == DIRECTORY)) {
1429 				/* Types must be "compatible" */
1430 				return fsalstat(ERR_FSAL_EXIST, 0);
1431 			}
1432 	
1433 			numkids = atomic_fetch_uint32_t(
1434 					&mem_lookup_dst->mh_dir.numkids);
1435 			if (mem_lookup_dst->obj_handle.type == DIRECTORY &&
1436 			    numkids > 2) {
1437 				/* Target dir must be empty */
1438 				return fsalstat(ERR_FSAL_EXIST, 0);
1439 			}
1440 	
1441 			/* Unlink destination */
1442 			status = mem_unlink(newdir_hdl, &mem_lookup_dst->obj_handle,
1443 					    new_name);
1444 			if (FSAL_IS_ERROR(status)) {
1445 				return status;
1446 			}
1447 		}
1448 	
1449 	#ifdef USE_LTTNG
1450 		tracepoint(fsalmem, mem_rename, __func__, __LINE__, obj_hdl,
1451 			   mem_olddir->m_name, old_name, mem_newdir->m_name, new_name);
1452 	#endif
1453 	
1454 		/* Remove from old dir */
1455 		mem_remove_dirent(mem_olddir, old_name);
1456 	
1457 		if (!strcmp(old_name, mem_obj->m_name)) {
1458 			/* Change base name */
1459 			gsh_free(mem_obj->m_name);
1460 			mem_obj->m_name = gsh_strdup(new_name);
1461 		}
1462 	
1463 		/* Insert into new directory */
1464 		mem_insert_obj(mem_newdir, mem_obj, new_name);
1465 	
1466 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1467 	}
1468 	
1469 	/**
1470 	 * @brief Open a file descriptor for read or write and possibly create
1471 	 *
1472 	 * @param[in] obj_hdl               File to open or parent directory
1473 	 * @param[in,out] state             state_t to use for this operation
1474 	 * @param[in] openflags             Mode for open
1475 	 * @param[in] createmode            Mode for create
1476 	 * @param[in] name                  Name for file if being created or opened
1477 	 * @param[in] attrs_in              Attributes to set on created file
1478 	 * @param[in] verifier              Verifier to use for exclusive create
1479 	 * @param[in,out] new_obj           Newly created object
1480 	 * @param[in,out] attrs_out         Optional attributes for newly created object
1481 	 * @param[in,out] caller_perm_check The caller must do a permission check
1482 	 *
1483 	 * @return FSAL status.
1484 	 */
1485 	fsal_status_t mem_open2(struct fsal_obj_handle *obj_hdl,
1486 				struct state_t *state,
1487 				fsal_openflags_t openflags,
1488 				enum fsal_create_mode createmode,
1489 				const char *name,
1490 				struct attrlist *attrs_set,
1491 				fsal_verifier_t verifier,
1492 				struct fsal_obj_handle **new_obj,
1493 				struct attrlist *attrs_out,
1494 				bool *caller_perm_check)
1495 	{
1496 		fsal_status_t status = {0, 0};
1497 		struct fsal_fd *my_fd = NULL;
1498 		struct mem_fsal_obj_handle *myself, *hdl = NULL;
1499 		bool truncated;
1500 		bool setattrs = attrs_set != NULL;
1501 		bool created = false;
1502 		struct attrlist verifier_attr;
1503 	
1504 		if (state != NULL)
1505 			my_fd = (struct fsal_fd *)(state + 1);
1506 	
1507 		myself = container_of(obj_hdl, struct mem_fsal_obj_handle, obj_handle);
1508 	
1509 		if (setattrs)
1510 			LogAttrlist(COMPONENT_FSAL, NIV_FULL_DEBUG,
1511 				    "attrs_set ", attrs_set, false);
1512 	
1513 		truncated = (openflags & FSAL_O_TRUNC) != 0;
1514 		LogFullDebug(COMPONENT_FSAL,
1515 			     truncated ? "Truncate" : "No truncate");
1516 	
1517 		/* Now fixup attrs for verifier if exclusive create */
1518 		if (createmode >= FSAL_EXCLUSIVE) {
1519 			if (!setattrs) {
1520 				/* We need to use verifier_attr */
1521 				attrs_set = &verifier_attr;
1522 				memset(&verifier_attr, 0, sizeof(verifier_attr));
1523 			}
1524 	
1525 			set_common_verifier(attrs_set, verifier);
1526 		}
1527 	
1528 		if (name == NULL) {
1529 			/* This is an open by handle */
1530 	#ifdef USE_LTTNG
1531 			tracepoint(fsalmem, mem_open, __func__, __LINE__, obj_hdl,
1532 				   myself->m_name, state, truncated, setattrs);
1533 	#endif
1534 			/* Need a lock to protect the FD */
1535 			PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1536 	
1537 			if (state != NULL) {
1538 				/* Prepare to take the share reservation, but only if we
1539 				 * are called with a valid state (if state is NULL the
1540 				 * caller is a stateless create such as NFS v3 CREATE).
1541 				 */
1542 	
1543 				/* Check share reservation conflicts. */
1544 				status = check_share_conflict(&myself->mh_file.share,
1545 							      openflags,
1546 							      false);
1547 	
1548 				if (FSAL_IS_ERROR(status)) {
1549 					PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1550 					return status;
1551 				}
1552 	
1553 				/* Take the share reservation now by updating the
1554 				 * counters.
1555 				 */
1556 				update_share_counters(&myself->mh_file.share,
1557 						      FSAL_O_CLOSED,
1558 						      openflags);
1559 			} else {
1560 				/* We need to use the global fd to continue, and take
1561 				 * the lock to protect it.
1562 				 */
1563 				my_fd = &myself->mh_file.fd;
1564 			}
1565 	
1566 			if (openflags & FSAL_O_WRITE)
1567 				openflags |= FSAL_O_READ;
1568 			mem_open_my_fd(my_fd, openflags);
1569 	
1570 			if (truncated)
1571 				myself->attrs.filesize = myself->attrs.spaceused = 0;
1572 	
1573 			/* Now check verifier for exclusive, but not for
1574 			 * FSAL_EXCLUSIVE_9P.
1575 			 */
1576 			if (createmode >= FSAL_EXCLUSIVE &&
1577 			    createmode != FSAL_EXCLUSIVE_9P &&
1578 			    !check_verifier_attrlist(&myself->attrs, verifier)) {
1579 				/* Verifier didn't match, return EEXIST */
1580 				status = fsalstat(posix2fsal_error(EEXIST), EEXIST);
1581 			}
1582 	
1583 			if (!FSAL_IS_ERROR(status)) {
1584 				/* Return success. */
1585 				PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1586 				if (attrs_out != NULL)
1587 					/* Note, myself->attrs is usually protected by a
1588 					 * the attr_lock in MDCACHE.  It's not in this
1589 					 * case.  Since MEM is not a production FSAL,
1590 					 * this is deemed to be okay for the moment.
1591 					 */
1592 					fsal_copy_attrs(attrs_out, &myself->attrs,
1593 							false);
1594 				return status;
1595 			}
1596 	
1597 			(void) mem_close_my_fd(my_fd);
1598 	
1599 			if (state == NULL) {
1600 				/* If no state, release the lock taken above and return
1601 				 * status.
1602 				 */
1603 				PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1604 				return status;
1605 			}
1606 	
1607 			/* Can only get here with state not NULL and an error */
1608 	
1609 			/* On error we need to release our share reservation
1610 			 * and undo the update of the share counters.
1611 			 * This can block over an I/O operation.
1612 			 */
1613 			update_share_counters(&myself->mh_file.share,
1614 					      openflags,
1615 					      FSAL_O_CLOSED);
1616 	
1617 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1618 			return status;
1619 		}
1620 	
1621 		/* In this path where we are opening by name, we can't check share
1622 		 * reservation yet since we don't have an object_handle yet. If we
1623 		 * indeed create the object handle (there is no race with another
1624 		 * open by name), then there CAN NOT be a share conflict, otherwise
1625 		 * the share conflict will be resolved when the object handles are
1626 		 * merged.
1627 		 */
1628 	
1629 		status = mem_int_lookup(myself, name, &hdl);
1630 		if (FSAL_IS_ERROR(status)) {
1631 			struct fsal_obj_handle *create;
1632 	
1633 			if (status.major != ERR_FSAL_NOENT) {
1634 				/* Actual error from lookup */
1635 				return status;
1636 			}
1637 			/* Doesn't exist, create it */
1638 			status = mem_create_obj(myself, REGULAR_FILE, name, attrs_set,
1639 						&create, attrs_out);
1640 			if (FSAL_IS_ERROR(status)) {
1641 				return status;
1642 			}
1643 			hdl = container_of(create, struct mem_fsal_obj_handle,
1644 					   obj_handle);
1645 			created = true;
1646 		}
1647 	#ifdef USE_LTTNG
1648 		tracepoint(fsalmem, mem_open, __func__, __LINE__, &hdl->obj_handle,
1649 			   hdl->m_name, state, truncated, setattrs);
1650 	#endif
1651 	
1652 		*caller_perm_check = !created;
1653 	
1654 		/* If we didn't have a state above, use the global fd. At this point,
1655 		 * since we just created the global fd, no one else can have a
1656 		 * reference to it, and thus we can mamnipulate unlocked which is
1657 		 * handy since we can then call setattr2 which WILL take the lock
1658 		 * without a double locking deadlock.
1659 		 */
1660 		if (my_fd == NULL)
1661 			my_fd = &hdl->mh_file.fd;
1662 	
1663 		if (openflags & FSAL_O_WRITE)
1664 			openflags |= FSAL_O_READ;
1665 		mem_open_my_fd(my_fd, openflags);
1666 	
1667 		*new_obj = &hdl->obj_handle;
1668 	
1669 		if (!created) {
1670 			/* Create sets and gets attributes, so only do this if not
1671 			 * creating */
1672 			if (setattrs && attrs_set->valid_mask != 0) {
1673 				mem_copy_attrs_mask(attrs_set, &hdl->attrs);
1674 			}
1675 	
1676 			if (attrs_out != NULL) {
1677 				status = (*new_obj)->obj_ops->getattrs(*new_obj,
1678 								      attrs_out);
1679 				if (FSAL_IS_ERROR(status) &&
1680 				    (attrs_out->request_mask & ATTR_RDATTR_ERR) == 0) {
1681 					/* Get attributes failed and caller expected
1682 					 * to get the attributes. Otherwise continue
1683 					 * with attrs_out indicating ATTR_RDATTR_ERR.
1684 					 */
1685 					return status;
1686 				}
1687 			}
1688 		}
1689 	
1690 		if (state != NULL) {
1691 			/* Prepare to take the share reservation, but only if we are
1692 			 * called with a valid state (if state is NULL the caller is
1693 			 * a stateless create such as NFS v3 CREATE).
1694 			 */
1695 	
1696 			/* This can block over an I/O operation. */
1697 			PTHREAD_RWLOCK_wrlock(&(*new_obj)->obj_lock);
1698 	
1699 			/* Take the share reservation now by updating the counters. */
1700 			update_share_counters(&hdl->mh_file.share,
1701 					      FSAL_O_CLOSED,
1702 					      openflags);
1703 	
1704 			PTHREAD_RWLOCK_unlock(&(*new_obj)->obj_lock);
1705 		}
1706 	
1707 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1708 	}
1709 	
1710 	/**
1711 	 * @brief Re-open a file that may be already opened
1712 	 *
1713 	 * This function supports changing the access mode of a share reservation and
1714 	 * thus should only be called with a share state. The state_lock must be held.
1715 	 *
1716 	 * This MAY be used to open a file the first time if there is no need for
1717 	 * open by name or create semantics. One example would be 9P lopen.
1718 	 *
1719 	 * @param[in] obj_hdl     File on which to operate
1720 	 * @param[in] state       state_t to use for this operation
1721 	 * @param[in] openflags   Mode for re-open
1722 	 *
1723 	 * @return FSAL status.
1724 	 */
1725 	fsal_status_t mem_reopen2(struct fsal_obj_handle *obj_hdl,
1726 				  struct state_t *state,
1727 				  fsal_openflags_t openflags)
1728 	{
1729 		struct mem_fsal_obj_handle *myself =
1730 			container_of(obj_hdl, struct mem_fsal_obj_handle, obj_handle);
1731 		fsal_status_t status = {0, 0};
1732 		struct fsal_fd *my_fd = (struct fsal_fd *)(state + 1);
1733 		fsal_openflags_t old_openflags;
1734 	
1735 	#ifdef USE_LTTNG
1736 		tracepoint(fsalmem, mem_open, __func__, __LINE__, obj_hdl,
1737 				   myself->m_name, state, openflags & FSAL_O_TRUNC,
1738 				   false);
1739 	#endif
1740 	
1741 		old_openflags = my_fd->openflags;
1742 	
1743 		/* This can block over an I/O operation. */
1744 		PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
1745 	
1746 		/* We can conflict with old share, so go ahead and check now. */
1747 		status = check_share_conflict(&myself->mh_file.share, openflags, false);
1748 		if (FSAL_IS_ERROR(status)) {
1749 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1750 			return status;
1751 		}
1752 	
1753 		/* Set up the new share so we can drop the lock and not have a
1754 		 * conflicting share be asserted, updating the share counters.
1755 		 */
1756 		update_share_counters(&myself->mh_file.share, old_openflags, openflags);
1757 	
1758 		PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1759 	
1760 		mem_open_my_fd(my_fd, openflags);
1761 		if (openflags & FSAL_O_TRUNC)
1762 			myself->attrs.filesize = myself->attrs.spaceused = 0;
1763 	
1764 		return status;
1765 	}
1766 	
1767 	struct mem_async_arg {
1768 		struct fsal_obj_handle *obj_hdl;
1769 		struct fsal_io_arg *io_arg;
1770 		fsal_async_cb done_cb;
1771 		void *caller_arg;
1772 		struct gsh_export *ctx_export;
1773 		struct fsal_export *fsal_export;
1774 	};
1775 	
1776 	static void
1777 	mem_async_complete(struct fridgethr_context *ctx)
1778 	{
1779 		struct mem_async_arg *async_arg = ctx->arg;
1780 		struct root_op_context root_op_context;
1781 		struct mem_fsal_export *mem_export =
1782 		   container_of(async_arg->fsal_export, struct mem_fsal_export, export);
1783 		uint32_t async_delay = atomic_fetch_uint32_t(&mem_export->async_delay);
1784 	
1785 		/* Now check if we need to delay the call back */
1786 		if (atomic_fetch_uint32_t(&mem_export->async_type) != MEM_FIXED) {
1787 			/* Randomize delay */
(1) Event dont_call: "random" should not be used for security-related applications, because linear congruential algorithms are too easy to break.
(2) Event remediation: Use a compliant random number generator, such as "/dev/random" or "/dev/urandom" on Unix-like systems, and CNG (Cryptography API: Next Generation) on Windows.
1788 			async_delay = random() % async_delay;
1789 		}
1790 	
1791 		if (async_delay != 0) {
1792 			/* Now actually delay call back */
1793 			usleep(async_delay);
1794 		}
1795 	
1796 		/* Need an op context for the call back */
1797 		init_root_op_context(&root_op_context, async_arg->ctx_export,
1798 				     async_arg->fsal_export, 0, 0,
1799 				     UNKNOWN_REQUEST);
1800 	
1801 		async_arg->done_cb(async_arg->obj_hdl, fsalstat(ERR_FSAL_NO_ERROR, 0),
1802 				   async_arg->io_arg, async_arg->caller_arg);
1803 	
1804 		release_root_op_context();
1805 	
1806 		gsh_free(async_arg);
1807 	}
1808 	
1809 	
1810 	/**
1811 	 * @brief Read data from a file
1812 	 *
1813 	 * This function reads data from the given file. The FSAL must be able to
1814 	 * perform the read whether a state is presented or not. This function also
1815 	 * is expected to handle properly bypassing or not share reservations.  This is
1816 	 * an (optionally) asynchronous call.  When the I/O is complete, the done
1817 	 * callback is called with the results.
1818 	 *
1819 	 * @param[in]     obj_hdl	File on which to operate
1820 	 * @param[in]     bypass	If state doesn't indicate a share reservation,
1821 	 *				bypass any deny read
1822 	 * @param[in,out] done_cb	Callback to call when I/O is done
1823 	 * @param[in,out] read_arg	Info about read, passed back in callback
1824 	 * @param[in,out] caller_arg	Opaque arg from the caller for callback
1825 	 *
1826 	 * @return Nothing; results are in callback
1827 	 */
1828 	
1829 	void mem_read2(struct fsal_obj_handle *obj_hdl,
1830 		       bool bypass,
1831 		       fsal_async_cb done_cb,
1832 		       struct fsal_io_arg *read_arg,
1833 		       void *caller_arg)
1834 	{
1835 		struct mem_fsal_obj_handle *myself = container_of(obj_hdl,
1836 					  struct mem_fsal_obj_handle, obj_handle);
1837 		struct fsal_fd *fsal_fd;
1838 		bool has_lock, closefd = false;
1839 		fsal_status_t status = {ERR_FSAL_NO_ERROR, 0};
1840 		bool reusing_open_state_fd = false;
1841 		uint64_t offset = read_arg->offset;
1842 		int i;
1843 		struct mem_fsal_export *mem_export =
1844 		      container_of(op_ctx->fsal_export, struct mem_fsal_export, export);
1845 		uint32_t async_type = atomic_fetch_uint32_t(&mem_export->async_type);
1846 		uint32_t async_stall_delay =
1847 				atomic_fetch_uint32_t(&mem_export->async_stall_delay);
1848 	
1849 		if (read_arg->info != NULL) {
1850 			/* Currently we don't support READ_PLUS */
1851 			done_cb(obj_hdl, fsalstat(ERR_FSAL_NOTSUPP, 0), read_arg,
1852 				caller_arg);
1853 			return;
1854 		}
1855 	
1856 		/* Find an FD */
1857 		status = fsal_find_fd(&fsal_fd, obj_hdl, &myself->mh_file.fd,
1858 				      &myself->mh_file.share, bypass, read_arg->state,
1859 				      FSAL_O_READ, mem_open_func, mem_close_func,
1860 				      &has_lock, &closefd, false,
1861 				      &reusing_open_state_fd);
1862 		if (FSAL_IS_ERROR(status)) {
1863 			done_cb(obj_hdl, status, read_arg, caller_arg);
1864 			return;
1865 		}
1866 	
1867 		read_arg->io_amount = 0;
1868 	
1869 		for (i = 0; i < read_arg->iov_count; i++) {
1870 			size_t bufsize;
1871 	
1872 			if (offset > myself->attrs.filesize) {
1873 				/* Past end of file */
1874 				read_arg->end_of_file = true;
1875 				break;
1876 			}
1877 	
1878 			bufsize = read_arg->iov[i].iov_len;
1879 			if (offset +  bufsize > myself->attrs.filesize) {
1880 				bufsize = myself->attrs.filesize - offset;
1881 			}
1882 			if (offset < myself->datasize) {
1883 				size_t readsize;
1884 	
1885 				/* Data to read */
1886 				readsize = MIN(bufsize, myself->datasize - offset);
1887 				memcpy(read_arg->iov[i].iov_base, myself->data + offset,
1888 				       readsize);
1889 				if (readsize < bufsize)
1890 					memset(read_arg->iov[i].iov_base + readsize,
1891 					       'a', bufsize - readsize);
1892 			} else {
1893 				memset(read_arg->iov[i].iov_base, 'a', bufsize);
1894 			}
1895 			read_arg->io_amount += bufsize;
1896 			offset += bufsize;
1897 		}
1898 	
1899 	#ifdef USE_LTTNG
1900 		tracepoint(fsalmem, mem_read, __func__, __LINE__, obj_hdl,
1901 			   myself->m_name, read_arg->state, myself->attrs.filesize,
1902 			   myself->attrs.spaceused);
1903 	#endif
1904 	
1905 		now(&myself->attrs.atime);
1906 	
1907 		if (has_lock)
1908 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
1909 	
1910 		if (async_type > MEM_RANDOM_OR_INLINE ||
1911 		    ((async_type == MEM_RANDOM_OR_INLINE) && ((random() % 1) == 1))) {
1912 			struct mem_async_arg *async_arg;
1913 	
1914 			/* Was MEM_FIXED, MEM_RANDOM, or MEM_RANDOM_OR_INLINE and we
1915 			 * scored a non-inline.
1916 			 */
1917 			async_arg = gsh_malloc(sizeof(*async_arg));
1918 	
1919 			async_arg->obj_hdl = obj_hdl;
1920 			async_arg->io_arg = read_arg;
1921 			async_arg->caller_arg = caller_arg;
1922 			async_arg->done_cb = done_cb;
1923 			async_arg->ctx_export = op_ctx->ctx_export;
1924 			async_arg->fsal_export = op_ctx->fsal_export;
1925 	
1926 			if (fridgethr_submit(mem_async_fridge,
1927 					     mem_async_complete,
1928 					     async_arg) == 0) {
1929 				/* Async fired off... */
1930 				goto out;
1931 			}
1932 	
1933 			/* Could not schedule, fall through and do an immediate
1934 			 * call back.
1935 			 */
1936 			gsh_free(async_arg);
1937 		}
1938 	
1939 		done_cb(obj_hdl, fsalstat(ERR_FSAL_NO_ERROR, 0), read_arg, caller_arg);
1940 	
1941 	out:
1942 	
1943 		if (async_stall_delay > 0) {
1944 			/* We have been asked to stall the calling thread, whether we
1945 			 * issued an inline or async callback.
1946 			 */
1947 			usleep(async_stall_delay);
1948 		}
1949 	}
1950 	
1951 	/**
1952 	 * @brief Write data to a file
1953 	 *
1954 	 * This function writes data to a file. The FSAL must be able to
1955 	 * perform the write whether a state is presented or not. This function also
1956 	 * is expected to handle properly bypassing or not share reservations. Even
1957 	 * with bypass == true, it will enforce a mandatory (NFSv4) deny_write if
1958 	 * an appropriate state is not passed).
1959 	 *
1960 	 * The FSAL is expected to enforce sync if necessary.
1961 	 *
1962 	 * @param[in]     obj_hdl        File on which to operate
1963 	 * @param[in]     bypass         If state doesn't indicate a share reservation,
1964 	 *                               bypass any non-mandatory deny write
1965 	 * @param[in,out] done_cb	Callback to call when I/O is done
1966 	 * @param[in,out] read_arg	Info about read, passed back in callback
1967 	 * @param[in,out] caller_arg	Opaque arg from the caller for callback
1968 	 */
1969 	
1970 	void mem_write2(struct fsal_obj_handle *obj_hdl,
1971 				 bool bypass,
1972 				 fsal_async_cb done_cb,
1973 				 struct fsal_io_arg *write_arg,
1974 				 void *caller_arg)
1975 	{
1976 		struct mem_fsal_obj_handle *myself = container_of(obj_hdl,
1977 					  struct mem_fsal_obj_handle, obj_handle);
1978 		struct fsal_fd *fsal_fd;
1979 		bool has_lock, closefd = false;
1980 		fsal_status_t status = {ERR_FSAL_NO_ERROR, 0};
1981 		bool reusing_open_state_fd = false;
1982 		uint64_t offset = write_arg->offset;
1983 		int i;
1984 		struct mem_fsal_export *mem_export =
1985 		      container_of(op_ctx->fsal_export, struct mem_fsal_export, export);
1986 		uint32_t async_type = atomic_fetch_uint32_t(&mem_export->async_type);
1987 		uint32_t async_stall_delay =
1988 				atomic_fetch_uint32_t(&mem_export->async_stall_delay);
1989 	
1990 		if (obj_hdl->type != REGULAR_FILE) {
1991 			/* Currently can only write to a file */
1992 			done_cb(obj_hdl, fsalstat(ERR_FSAL_INVAL, 0), write_arg,
1993 				caller_arg);
1994 			return;
1995 		}
1996 	
1997 		/* Find an FD */
1998 		status = fsal_find_fd(&fsal_fd, obj_hdl, &myself->mh_file.fd,
1999 				      &myself->mh_file.share, bypass, write_arg->state,
2000 				      FSAL_O_WRITE, mem_open_func, mem_close_func,
2001 				      &has_lock, &closefd, false,
2002 				      &reusing_open_state_fd);
2003 		if (FSAL_IS_ERROR(status)) {
2004 			done_cb(obj_hdl, status, write_arg, caller_arg);
2005 			return;
2006 		}
2007 	
2008 		for (i = 0; i < write_arg->iov_count; i++) {
2009 			size_t bufsize;
2010 	
2011 			bufsize = write_arg->iov[i].iov_len;
2012 			if (offset +  bufsize > myself->attrs.filesize) {
2013 				myself->attrs.filesize = myself->attrs.spaceused =
2014 					offset + bufsize;
2015 			}
2016 			if (offset < myself->datasize) {
2017 				size_t writesize;
2018 	
2019 				/* Data to write */
2020 				writesize = MIN(bufsize, myself->datasize - offset);
2021 				memcpy(myself->data + offset,
2022 				       write_arg->iov[i].iov_base, writesize);
2023 			}
2024 			write_arg->io_amount += bufsize;
2025 			offset += bufsize;
2026 		}
2027 	
2028 	#ifdef USE_LTTNG
2029 		tracepoint(fsalmem, mem_write, __func__, __LINE__, obj_hdl,
2030 				   myself->m_name, write_arg->state,
2031 				   myself->attrs.filesize, myself->attrs.spaceused);
2032 	#endif
2033 	
2034 		/* Update change stats */
2035 		now(&myself->attrs.mtime);
2036 	
2037 		/** @todo FSF - this calculation may be different than what particular
2038 		 *              FSALs use. Is that a problem? Also, above, ctime is used
2039 		 *              not mtime...
2040 		 */
2041 		myself->attrs.change =
2042 			timespec_to_nsecs(&myself->attrs.mtime);
2043 	
2044 		if (has_lock)
2045 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2046 	
2047 		if (async_type > MEM_RANDOM_OR_INLINE ||
2048 		    ((async_type == MEM_RANDOM_OR_INLINE) && ((random() % 1) == 1))) {
2049 			struct mem_async_arg *async_arg;
2050 	
2051 			/* Was MEM_FIXED, MEM_RANDOM, or MEM_RANDOM_OR_INLINE and we
2052 			 * scored a non-inline.
2053 			 */
2054 			async_arg = gsh_malloc(sizeof(*async_arg));
2055 	
2056 			async_arg->obj_hdl = obj_hdl;
2057 			async_arg->io_arg = write_arg;
2058 			async_arg->caller_arg = caller_arg;
2059 			async_arg->done_cb = done_cb;
2060 			async_arg->ctx_export = op_ctx->ctx_export;
2061 			async_arg->fsal_export = op_ctx->fsal_export;
2062 	
2063 			if (fridgethr_submit(mem_async_fridge,
2064 					     mem_async_complete,
2065 					     async_arg) == 0) {
2066 				/* Async fired off... */
2067 				goto out;
2068 			}
2069 	
2070 			/* Could not schedule, fall through and do an immediate
2071 			 * call back.
2072 			 */
2073 			gsh_free(async_arg);
2074 		}
2075 	
2076 		done_cb(obj_hdl, fsalstat(ERR_FSAL_NO_ERROR, 0), write_arg, caller_arg);
2077 	
2078 	out:
2079 	
2080 		if (async_stall_delay > 0) {
2081 			/* We have been asked to stall the calling thread, whether we
2082 			 * issued an inline or async callback.
2083 			 */
2084 			usleep(async_stall_delay);
2085 		}
2086 	}
2087 	
2088 	/**
2089 	 * @brief Commit written data
2090 	 *
2091 	 * This function flushes possibly buffered data to a file. This method
2092 	 * differs from commit due to the need to interact with share reservations
2093 	 * and the fact that the FSAL manages the state of "file descriptors". The
2094 	 * FSAL must be able to perform this operation without being passed a specific
2095 	 * state.
2096 	 *
2097 	 * @param[in] obj_hdl          File on which to operate
2098 	 * @param[in] state            state_t to use for this operation
2099 	 * @param[in] offset           Start of range to commit
2100 	 * @param[in] len              Length of range to commit
2101 	 *
2102 	 * @return FSAL status.
2103 	 */
2104 	
2105 	fsal_status_t mem_commit2(struct fsal_obj_handle *obj_hdl,
2106 				  off_t offset,
2107 				  size_t len)
2108 	{
2109 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2110 	}
2111 	
2112 	/**
2113 	 * @brief Perform a lock operation
2114 	 *
2115 	 * This function performs a lock operation (lock, unlock, test) on a
2116 	 * file. This method assumes the FSAL is able to support lock owners,
2117 	 * though it need not support asynchronous blocking locks. Passing the
2118 	 * lock state allows the FSAL to associate information with a specific
2119 	 * lock owner for each file (which may include use of a "file descriptor".
2120 	 *
2121 	 * @param[in]  obj_hdl          File on which to operate
2122 	 * @param[in]  state            state_t to use for this operation
2123 	 * @param[in]  owner            Lock owner
2124 	 * @param[in]  lock_op          Operation to perform
2125 	 * @param[in]  request_lock     Lock to take/release/test
2126 	 * @param[out] conflicting_lock Conflicting lock
2127 	 *
2128 	 * @return FSAL status.
2129 	 */
2130 	fsal_status_t mem_lock_op2(struct fsal_obj_handle *obj_hdl,
2131 				   struct state_t *state,
2132 				   void *owner,
2133 				   fsal_lock_op_t lock_op,
2134 				   fsal_lock_param_t *request_lock,
2135 				   fsal_lock_param_t *conflicting_lock)
2136 	{
2137 		struct mem_fsal_obj_handle *myself = container_of(obj_hdl,
2138 					  struct mem_fsal_obj_handle, obj_handle);
2139 		struct fsal_fd fsal_fd = {0}, *fdp = &fsal_fd;
2140 		bool has_lock, closefd = false;
2141 		bool bypass = false;
2142 		fsal_openflags_t openflags;
2143 		fsal_status_t status = {ERR_FSAL_NO_ERROR, 0};
2144 		bool reusing_open_state_fd = false;
2145 	
2146 		if (obj_hdl->type != REGULAR_FILE) {
2147 			/* Currently can only lock a file */
2148 			return fsalstat(ERR_FSAL_INVAL, 0);
2149 		}
2150 	
2151 		switch (lock_op) {
2152 		case FSAL_OP_LOCKT:
2153 			/* We may end up using global fd, don't fail on a deny mode */
2154 			bypass = true;
2155 			openflags = FSAL_O_ANY;
2156 			break;
2157 		case FSAL_OP_LOCK:
2158 			if (request_lock->lock_type == FSAL_LOCK_R)
2159 				openflags = FSAL_O_READ;
2160 			else if (request_lock->lock_type == FSAL_LOCK_W)
2161 				openflags = FSAL_O_WRITE;
2162 			else
2163 				openflags = FSAL_O_RDWR;
2164 			break;
2165 		case FSAL_OP_UNLOCK:
2166 			openflags = FSAL_O_ANY;
2167 			break;
2168 		default:
2169 			LogDebug(COMPONENT_FSAL,
2170 				 "ERROR: The requested lock type was not read or write.");
2171 			return fsalstat(ERR_FSAL_NOTSUPP, 0);
2172 		}
2173 	
2174 		status = fsal_find_fd(&fdp, obj_hdl, &myself->mh_file.fd,
2175 				      &myself->mh_file.share, bypass, state,
2176 				      openflags, mem_open_func, mem_close_func,
2177 				      &has_lock, &closefd, true,
2178 				      &reusing_open_state_fd);
2179 		if (FSAL_IS_ERROR(status)) {
2180 			return status;
2181 		}
2182 	
2183 		if (has_lock)
2184 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2185 	
2186 		return status;
2187 	}
2188 	
2189 	/**
2190 	 * @brief Manage closing a file when a state is no longer needed.
2191 	 *
2192 	 * When the upper layers are ready to dispense with a state, this method is
2193 	 * called to allow the FSAL to close any file descriptors or release any other
2194 	 * resources associated with the state. A call to free_state should be assumed
2195 	 * to follow soon.
2196 	 *
2197 	 * @param[in] obj_hdl    File on which to operate
2198 	 * @param[in] state      state_t to use for this operation
2199 	 *
2200 	 * @return FSAL status.
2201 	 */
2202 	
2203 	fsal_status_t mem_close2(struct fsal_obj_handle *obj_hdl,
2204 				 struct state_t *state)
2205 	{
2206 		struct fsal_fd *my_fd = (struct fsal_fd *)(state + 1);
2207 		struct mem_fsal_obj_handle *myself = container_of(obj_hdl,
2208 					  struct mem_fsal_obj_handle, obj_handle);
2209 		fsal_status_t status;
2210 	
2211 	#ifdef USE_LTTNG
2212 		tracepoint(fsalmem, mem_close, __func__, __LINE__, obj_hdl,
2213 			   myself->m_name, state);
2214 	#endif
2215 	
2216 		if (state->state_type == STATE_TYPE_SHARE ||
2217 		    state->state_type == STATE_TYPE_NLM_SHARE ||
2218 		    state->state_type == STATE_TYPE_9P_FID) {
2219 			/* This is a share state, we must update the share counters */
2220 	
2221 			/* This can block over an I/O operation. */
2222 			PTHREAD_RWLOCK_wrlock(&obj_hdl->obj_lock);
2223 	
2224 			update_share_counters(&myself->mh_file.share,
2225 					      my_fd->openflags,
2226 					      FSAL_O_CLOSED);
2227 	
2228 			PTHREAD_RWLOCK_unlock(&obj_hdl->obj_lock);
2229 		}
2230 	
2231 		status = mem_close_my_fd(my_fd);
2232 	
2233 		return status;
2234 	}
2235 	
2236 	/**
2237 	 * @brief Get the wire version of a handle
2238 	 *
2239 	 * fill in the opaque f/s file handle part.
2240 	 * we zero the buffer to length first.  This MAY already be done above
2241 	 * at which point, remove memset here because the caller is zeroing
2242 	 * the whole struct.
2243 	 *
2244 	 * @param[in] obj_hdl	Handle to digest
2245 	 * @param[in] out_type	Type of digest to get
2246 	 * @param[out] fh_desc	Buffer to write digest into
2247 	 * @return FSAL status
2248 	 */
2249 	static fsal_status_t mem_handle_to_wire(const struct fsal_obj_handle *obj_hdl,
2250 						fsal_digesttype_t output_type,
2251 						struct gsh_buffdesc *fh_desc)
2252 	{
2253 		const struct mem_fsal_obj_handle *myself;
2254 	
2255 		myself = container_of(obj_hdl,
2256 				      const struct mem_fsal_obj_handle,
2257 				      obj_handle);
2258 	
2259 		switch (output_type) {
2260 		case FSAL_DIGEST_NFSV3:
2261 		case FSAL_DIGEST_NFSV4:
2262 			if (fh_desc->len < V4_FH_OPAQUE_SIZE) {
2263 				LogMajor(COMPONENT_FSAL,
2264 					 "Space too small for handle.  need %lu, have %zu",
2265 					 ((unsigned long) V4_FH_OPAQUE_SIZE),
2266 					 fh_desc->len);
2267 				return fsalstat(ERR_FSAL_TOOSMALL, 0);
2268 			}
2269 	
2270 			memcpy(fh_desc->addr, myself->handle, V4_FH_OPAQUE_SIZE);
2271 			fh_desc->len = V4_FH_OPAQUE_SIZE;
2272 			break;
2273 	
2274 		default:
2275 			return fsalstat(ERR_FSAL_SERVERFAULT, 0);
2276 		}
2277 	
2278 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2279 	}
2280 	
2281 	/**
2282 	 * @brief Get the unique key for a handle
2283 	 *
2284 	 * return a handle descriptor into the handle in this object handle
2285 	 * @TODO reminder.  make sure things like hash keys don't point here
2286 	 * after the handle is released.
2287 	 *
2288 	 * @param[in] obj_hdl	Handle to digest
2289 	 * @param[out] fh_desc	Buffer to write key into
2290 	 */
2291 	static void mem_handle_to_key(struct fsal_obj_handle *obj_hdl,
2292 				  struct gsh_buffdesc *fh_desc)
2293 	{
2294 		struct mem_fsal_obj_handle *myself;
2295 	
2296 		myself = container_of(obj_hdl,
2297 				      struct mem_fsal_obj_handle,
2298 				      obj_handle);
2299 	
2300 		fh_desc->addr = myself->handle;
2301 		fh_desc->len = V4_FH_OPAQUE_SIZE;
2302 	}
2303 	
2304 	/**
2305 	 * @brief Get a ref on a MEM handle
2306 	 *
2307 	 * Stub, for bypass in unit tests
2308 	 *
2309 	 * @param[in] obj_hdl	Handle to ref
2310 	 */
2311 	static void mem_get_ref(struct fsal_obj_handle *obj_hdl)
2312 	{
2313 		struct mem_fsal_obj_handle *myself;
2314 	
2315 		myself = container_of(obj_hdl,
2316 				      struct mem_fsal_obj_handle,
2317 				      obj_handle);
2318 		mem_int_get_ref(myself);
2319 	}
2320 	
2321 	/**
2322 	 * @brief Put a ref on a MEM handle
2323 	 *
2324 	 * Stub, for bypass in unit tests
2325 	 *
2326 	 * @param[in] obj_hdl	Handle to unref
2327 	 */
2328 	static void mem_put_ref(struct fsal_obj_handle *obj_hdl)
2329 	{
2330 		struct mem_fsal_obj_handle *myself;
2331 	
2332 		myself = container_of(obj_hdl,
2333 				      struct mem_fsal_obj_handle,
2334 				      obj_handle);
2335 		mem_int_put_ref(myself);
2336 	}
2337 	
2338 	/**
2339 	 * @brief Release an object handle
2340 	 *
2341 	 * @param[in] obj_hdl	Handle to release
2342 	 */
2343 	static void mem_release(struct fsal_obj_handle *obj_hdl)
2344 	{
2345 		struct mem_fsal_obj_handle *myself;
2346 	
2347 		myself = container_of(obj_hdl,
2348 				      struct mem_fsal_obj_handle,
2349 				      obj_handle);
2350 		mem_int_put_ref(myself);
2351 	}
2352 	
2353 	/**
2354 	 * @brief Merge two handles
2355 	 *
2356 	 * For a failed create, we need to merge the two handles.  If the handles are
2357 	 * the same, we need to ref the handle, so that the following release doesn't
2358 	 * free it.
2359 	 *
2360 	 * @param[in] old_hdl	Handle to merge
2361 	 * @param[in] new_hdl	Handle to merge
2362 	 */
2363 	static fsal_status_t mem_merge(struct fsal_obj_handle *old_hdl,
2364 				       struct fsal_obj_handle *new_hdl)
2365 	{
2366 		fsal_status_t status = {ERR_FSAL_NO_ERROR, 0};
2367 	
2368 		if (old_hdl == new_hdl) {
2369 			/* Nothing to merge */
2370 			return status;
2371 		}
2372 	
2373 		if (old_hdl->type == REGULAR_FILE &&
2374 		    new_hdl->type == REGULAR_FILE) {
2375 			/* We need to merge the share reservations on this file.
2376 			 * This could result in ERR_FSAL_SHARE_DENIED.
2377 			 */
2378 			struct mem_fsal_obj_handle *old, *new;
2379 	
2380 			old = container_of(old_hdl,
2381 					    struct mem_fsal_obj_handle,
2382 					    obj_handle);
2383 			new = container_of(new_hdl,
2384 					    struct mem_fsal_obj_handle,
2385 					    obj_handle);
2386 	
2387 			/* This can block over an I/O operation. */
2388 			PTHREAD_RWLOCK_wrlock(&old_hdl->obj_lock);
2389 	
2390 			status = merge_share(&old->mh_file.share,
2391 					     &new->mh_file.share);
2392 	
2393 			PTHREAD_RWLOCK_unlock(&old_hdl->obj_lock);
2394 		}
2395 	
2396 		return status;
2397 	}
2398 	
2399 	void mem_handle_ops_init(struct fsal_obj_ops *ops)
2400 	{
2401 		fsal_default_obj_ops_init(ops);
2402 	
2403 		ops->get_ref = mem_get_ref,
2404 		ops->put_ref = mem_put_ref,
2405 		ops->merge = mem_merge,
2406 		ops->release = mem_release;
2407 		ops->lookup = mem_lookup;
2408 		ops->readdir = mem_readdir;
2409 		ops->mkdir = mem_mkdir;
2410 		ops->mknode = mem_mknode;
2411 		ops->symlink = mem_symlink;
2412 		ops->readlink = mem_readlink;
2413 		ops->getattrs = mem_getattrs;
2414 		ops->setattr2 = mem_setattr2;
2415 		ops->link = mem_link;
2416 		ops->rename = mem_rename;
2417 		ops->unlink = mem_unlink;
2418 		ops->close = mem_close;
2419 		ops->open2 = mem_open2;
2420 		ops->reopen2 = mem_reopen2;
2421 		ops->read2 = mem_read2;
2422 		ops->write2 = mem_write2;
2423 		ops->commit2 = mem_commit2;
2424 		ops->lock_op2 = mem_lock_op2;
2425 		ops->close2 = mem_close2;
2426 		ops->handle_to_wire = mem_handle_to_wire;
2427 		ops->handle_to_key = mem_handle_to_key;
2428 	}
2429 	
2430 	/* export methods that create object handles
2431 	 */
2432 	
2433 	/* lookup_path
2434 	 * modelled on old api except we don't stuff attributes.
2435 	 * KISS
2436 	 */
2437 	
2438 	fsal_status_t mem_lookup_path(struct fsal_export *exp_hdl,
2439 				      const char *path,
2440 				      struct fsal_obj_handle **obj_hdl,
2441 				      struct attrlist *attrs_out)
2442 	{
2443 		struct mem_fsal_export *mfe;
2444 		struct attrlist attrs;
2445 	
2446 		mfe = container_of(exp_hdl, struct mem_fsal_export, export);
2447 	
2448 		if (strcmp(path, mfe->export_path) != 0) {
2449 			/* Lookup of a path other than the export's root. */
2450 			LogCrit(COMPONENT_FSAL,
2451 				"Attempt to lookup non-root path %s",
2452 				path);
2453 			return fsalstat(ERR_FSAL_NOENT, ENOENT);
2454 		}
2455 	
2456 		attrs.valid_mask = ATTR_MODE;
2457 		attrs.mode = 0755;
2458 	
2459 		if (mfe->root_handle == NULL) {
2460 			mfe->root_handle = mem_alloc_handle(NULL,
2461 							    mfe->export_path,
2462 							    DIRECTORY,
2463 							    mfe,
2464 							    &attrs);
2465 		}
2466 	
2467 		*obj_hdl = &mfe->root_handle->obj_handle;
2468 	
2469 		if (attrs_out != NULL)
2470 			fsal_copy_attrs(attrs_out, &mfe->root_handle->attrs, false);
2471 	
2472 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
2473 	}
2474 	
2475 	/* create_handle
2476 	 * Does what original FSAL_ExpandHandle did (sort of)
2477 	 * returns a ref counted handle to be later used in cache_inode etc.
2478 	 * NOTE! you must release this thing when done with it!
2479 	 * BEWARE! Thanks to some holes in the *AT syscalls implementation,
2480 	 * we cannot get an fd on an AF_UNIX socket, nor reliably on block or
2481 	 * character special devices.  Sorry, it just doesn't...
2482 	 * we could if we had the handle of the dir it is in, but this method
2483 	 * is for getting handles off the wire for cache entries that have LRU'd.
2484 	 * Ideas and/or clever hacks are welcome...
2485 	 */
2486 	
2487 	fsal_status_t mem_create_handle(struct fsal_export *exp_hdl,
2488 					struct gsh_buffdesc *hdl_desc,
2489 					struct fsal_obj_handle **obj_hdl,
2490 					struct attrlist *attrs_out)
2491 	{
2492 		struct glist_head *glist;
2493 		struct fsal_obj_handle *hdl;
2494 		struct mem_fsal_obj_handle *my_hdl;
2495 	
2496 		*obj_hdl = NULL;
2497 	
2498 		if (hdl_desc->len != V4_FH_OPAQUE_SIZE) {
2499 			LogCrit(COMPONENT_FSAL,
2500 				"Invalid handle size %zu expected %lu",
2501 				hdl_desc->len,
2502 				((unsigned long) V4_FH_OPAQUE_SIZE));
2503 	
2504 			return fsalstat(ERR_FSAL_BADHANDLE, 0);
2505 		}
2506 	
2507 		PTHREAD_RWLOCK_rdlock(&exp_hdl->fsal->lock);
2508 	
2509 		glist_for_each(glist, &exp_hdl->fsal->handles) {
2510 			hdl = glist_entry(glist, struct fsal_obj_handle, handles);
2511 	
2512 			my_hdl = container_of(hdl,
2513 					      struct mem_fsal_obj_handle,
2514 					      obj_handle);
2515 	
2516 			if (memcmp(my_hdl->handle,
2517 				   hdl_desc->addr,
2518 				   V4_FH_OPAQUE_SIZE) == 0) {
2519 				LogDebug(COMPONENT_FSAL,
2520 					 "Found hdl=%p name=%s",
2521 					 my_hdl, my_hdl->m_name);
2522 	
2523 	#ifdef USE_LTTNG
2524 				tracepoint(fsalmem, mem_create_handle, __func__,
2525 					   __LINE__, hdl, my_hdl->m_name);
2526 	#endif
2527 				*obj_hdl = hdl;
2528 	
2529 				PTHREAD_RWLOCK_unlock(&exp_hdl->fsal->lock);
2530 	
2531 				if (attrs_out != NULL) {
2532 					fsal_copy_attrs(attrs_out, &my_hdl->attrs,
2533 							false);
2534 				}
2535 	
2536 				return fsalstat(ERR_FSAL_NO_ERROR, 0);
2537 			}
2538 		}
2539 	
2540 		LogDebug(COMPONENT_FSAL,
2541 			"Could not find handle");
2542 	
2543 		PTHREAD_RWLOCK_unlock(&exp_hdl->fsal->lock);
2544 	
2545 		return fsalstat(ERR_FSAL_STALE, ESTALE);
2546 	}
2547