1    	/*
2    	 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Copyright 2015-2019 Red Hat, Inc. and/or its affiliates.
5    	 * Author: Daniel Gryniewicz <dang@redhat.com>
6    	 *
7    	 * This program is free software; you can redistribute it and/or
8    	 * modify it under the terms of the GNU Lesser General Public
9    	 * License as published by the Free Software Foundation; either
10   	 * version 3 of the License, or (at your option) any later version.
11   	 *
12   	 * This program is distributed in the hope that it will be useful,
13   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   	 * Lesser General Public License for more details.
16   	 *
17   	 * You should have received a copy of the GNU Lesser General Public
18   	 * License along with this library; if not, write to the Free Software
19   	 * Foundation, Inc.,
20   	 * 51 Franklin Street, Fifth Floor, Boston, MA
21   	 * 02110-1301 USA
22   	 *
23   	 */
24   	
25   	/**
26   	 * @addtogroup FSAL_MDCACHE
27   	 * @{
28   	 */
29   	
30   	/**
31   	 * @file  mdcache_helpers.c
32   	 * @brief Miscellaneous helper functions
33   	 */
34   	
35   	#include "config.h"
36   	
37   	#include "sal_functions.h"
38   	#include "fsal.h"
39   	#include "FSAL/fsal_commonlib.h"
40   	#include "fsal_convert.h"
41   	
42   	#include <unistd.h>
43   	#include <sys/types.h>
44   	#include <string.h>
45   	#include <stdbool.h>
46   	
47   	#include "nfs_exports.h"
48   	
49   	#include "mdcache_lru.h"
50   	#include "mdcache_hash.h"
51   	#include "mdcache_avl.h"
52   	#ifdef USE_LTTNG
53   	#include "gsh_lttng/mdcache.h"
54   	#endif
55   	
56   	#define mdc_chunk_first_dirent(c) \
57   		glist_first_entry(&(c)->dirents, mdcache_dir_entry_t, chunk_list)
58   	
59   	static inline bool trust_negative_cache(mdcache_entry_t *parent)
60   	{
61   		bool trust = op_ctx_export_has_option(
62   					  EXPORT_OPTION_TRUST_READIR_NEGATIVE_CACHE) &&
63   			test_mde_flags(parent, MDCACHE_DIR_POPULATED);
64   	
65   		if (trust)
66   			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
67   					"Entry %p Trust negative cache",
68   					parent);
69   		else
70   			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
71   					"Entry %p Don't Trust negative cache",
72   					parent);
73   	
74   		return trust;
75   	}
76   	
77   	/**
78   	 * @brief Add a detached dirent to the LRU list (in the MRU position).
79   	 *
80   	 * If rhe maximum number of detached dirents would be exceeded, remove the
81   	 * LRU dirent.
82   	 *
83   	 * @note mdc_parent MUST have it's content_lock held for writing
84   	 *
85   	 * @param[in]     parent  Parent entry
86   	 * @param[in]     dirent  Dirent to move to MRU
87   	 */
88   	
89   	static inline void add_detached_dirent(mdcache_entry_t *parent,
90   					       mdcache_dir_entry_t *dirent)
91   	{
92   	#ifdef DEBUG_MDCACHE
93   		assert(parent->content_lock.__data.__cur_writer);
94   	#endif
95   		if (parent->fsobj.fsdir.detached_count ==
96   		    mdcache_param.dir.avl_detached_max) {
97   			/* Need to age out oldest detached dirent. */
98   			mdcache_dir_entry_t *removed;
99   	
100  			/* Find the oldest detached dirent and remove it.
101  			 * We just hold the spin lock for the list operation.
102  			 * Technically we don't need it since the content lock is held
103  			 * for write, there can be no conflicting threads. Since we
104  			 * don't have a racing thread, it's ok that the list is
105  			 * unprotected by spin lock while we make the AVL call.
106  			 */
107  			pthread_spin_lock(&parent->fsobj.fsdir.spin);
108  	
109  			removed = glist_last_entry(&parent->fsobj.fsdir.detached,
110  						   mdcache_dir_entry_t,
111  						   chunk_list);
112  	
113  			pthread_spin_unlock(&parent->fsobj.fsdir.spin);
114  	
115  			/* Remove from active names tree */
116  			mdcache_avl_remove(parent, removed);
117  		}
118  	
119  		/* Add new entry to MRU (head) of list */
120  		pthread_spin_lock(&parent->fsobj.fsdir.spin);
121  		glist_add(&parent->fsobj.fsdir.detached, &dirent->chunk_list);
122  		parent->fsobj.fsdir.detached_count++;
123  		pthread_spin_unlock(&parent->fsobj.fsdir.spin);
124  	}
125  	
126  	#define mdcache_alloc_handle(export, sub_handle, fs, reason) \
127  		_mdcache_alloc_handle(export, sub_handle, fs, reason, \
128  				      __func__, __LINE__)
129  	/**
130  	 * Allocate and initialize a new mdcache handle.
131  	 *
132  	 * This function doesn't free the sub_handle if the allocation fails. It must
133  	 * be done in the calling function.
134  	 *
135  	 * @param[in] export The mdcache export used by the handle.
136  	 * @param[in] sub_handle The handle used by the subfsal.
137  	 * @param[in] fs The filesystem of the new handle.
138  	 * @param[in] reason The reason the entry is being inserted
139  	 *
140  	 * @return The new handle, or NULL if the unexport in progress.
141  	 */
142  	static mdcache_entry_t *_mdcache_alloc_handle(
143  			struct mdcache_fsal_export *export,
144  			struct fsal_obj_handle *sub_handle,
145  			struct fsal_filesystem *fs,
146  			mdc_reason_t reason,
147  			const char *func, int line)
148  	{
149  		mdcache_entry_t *result;
150  		fsal_status_t status;
151  	
152  		result = mdcache_lru_get(sub_handle);
153  	
154  		if (result == NULL) {
155  			/* Should never happen, but our caller will handle... */
156  			return NULL;
157  		}
158  	
159  		/* Base data */
160  		result->sub_handle = sub_handle;
161  		result->obj_handle.type = sub_handle->type;
162  		result->obj_handle.fsid = sub_handle->fsid;
163  		result->obj_handle.fileid = sub_handle->fileid;
164  		result->obj_handle.fs = fs;
165  	
166  		/* default handlers */
167  		fsal_obj_handle_init(&result->obj_handle, &export->mfe_exp,
168  				     sub_handle->type);
169  		/* mdcache handlers */
170  		result->obj_handle.obj_ops = &MDCACHE.handle_ops;
171  		/* state */
172  		if (sub_handle->type == DIRECTORY) {
173  			result->obj_handle.state_hdl = &result->fsobj.fsdir.dhdl;
174  			/* init avl tree */
175  			mdcache_avl_init(result);
176  	
177  			/* init chunk list and detached dirents list */
178  			glist_init(&result->fsobj.fsdir.chunks);
179  			glist_init(&result->fsobj.fsdir.detached);
180  			(void) pthread_spin_init(&result->fsobj.fsdir.spin,
181  						 PTHREAD_PROCESS_PRIVATE);
182  		} else {
183  			result->obj_handle.state_hdl = &result->fsobj.hdl;
184  		}
185  		state_hdl_init(result->obj_handle.state_hdl, result->obj_handle.type,
186  			       &result->obj_handle);
187  	
188  		/* Initialize common fields */
189  		result->mde_flags = 0;
190  		glist_init(&result->export_list);
191  		atomic_store_int32_t(&result->first_export_id, -1);
192  	
193  		/* Map the export before we put this entry into the LRU, but after it's
194  		 * well enough set up to be able to be unrefed by unexport should there
195  		 * be a race.
196  		 */
197  		status = mdc_check_mapping(result);
198  	
199  		if (unlikely(FSAL_IS_ERROR(status))) {
200  			/* The current export is in process to be unexported, don't
201  			 * create new mdcache entries.
202  			 */
203  			LogDebug(COMPONENT_CACHE_INODE,
204  				 "Trying to allocate a new entry %p for export id %"
205  				 PRIi16" that is in the process of being unexported",
206  				 result, op_ctx->ctx_export->export_id);
207  			/* sub_handle will be freed by the caller */
208  			result->sub_handle = NULL;
209  			mdcache_put(result);
210  			/* Handle is not yet in hash / LRU, so just put the sentinal
211  			 * ref */
212  			mdcache_put(result);
213  			return NULL;
214  		}
215  	
216  		return result;
217  	}
218  	
219  	/**
220  	 *
221  	 * @brief Cleans up an entry so it can be reused
222  	 *
223  	 * @param[in]  entry     The cache entry to clean
224  	 */
225  	void mdc_clean_entry(mdcache_entry_t *entry)
226  	{
227  		struct glist_head *glist;
228  		struct glist_head *glistn;
229  	
230  		/* Must get attr_lock before mdc_exp_lock */
231  		PTHREAD_RWLOCK_wrlock(&entry->attr_lock);
232  	
233  		glist_for_each_safe(glist, glistn, &entry->export_list) {
234  			struct entry_export_map *expmap;
235  			struct mdcache_fsal_export *export;
236  	
237  			expmap = glist_entry(glist,
238  					     struct entry_export_map,
239  					     export_per_entry);
240  			export = expmap->exp;
241  	
242  			PTHREAD_RWLOCK_wrlock(&export->mdc_exp_lock);
243  	
244  			mdc_remove_export_map(expmap);
245  	
246  			PTHREAD_RWLOCK_unlock(&export->mdc_exp_lock);
247  		}
248  	
249  		/* Clear out first_export */
250  		atomic_store_int32_t(&entry->first_export_id, -1);
251  	
252  		PTHREAD_RWLOCK_unlock(&entry->attr_lock);
253  	
254  		if (entry->obj_handle.type == DIRECTORY) {
255  			PTHREAD_RWLOCK_wrlock(&entry->content_lock);
256  	
257  			/* Clean up dirents */
258  			mdcache_dirent_invalidate_all(entry);
259  			/* Clean up parent key */
260  			mdcache_free_fh(&entry->fsobj.fsdir.parent);
261  	
262  			PTHREAD_RWLOCK_unlock(&entry->content_lock);
263  		}
264  		cih_remove_checked(entry);
265  	
266  	}
267  	
268  	/**
269  	 *
270  	 * Check the active export mapping for this entry and update if necessary.
271  	 *
272  	 * If the entry does not have a mapping for the active export, add one.
273  	 *
274  	 * If an unexport is in progress, return ERR_FSAL_STALE to prevent the caller
275  	 * from proceeding.
276  	 *
277  	 * @param[in]  entry     The cache inode
278  	 *
279  	 * @return FSAL Status
280  	 *
281  	 */
282  	
283  	fsal_status_t mdc_check_mapping(mdcache_entry_t *entry)
284  	{
285  		struct mdcache_fsal_export *export = mdc_cur_export();
286  		struct glist_head *glist;
287  		struct entry_export_map *expmap;
288  		bool try_write = false;
289  	
290  		if (atomic_fetch_uint8_t(&export->flags) & MDC_UNEXPORT) {
291  			/* In the process of unexporting, don't check export mapping.
292  			 * Return a stale error.
293  			 */
294  			return fsalstat(ERR_FSAL_STALE, ESTALE);
295  		}
296  	
297  		/* Fast path check to see if this export is already mapped */
298  		if (atomic_fetch_int32_t(&entry->first_export_id) ==
299  		    (int32_t) op_ctx->ctx_export->export_id)
300  			return fsalstat(ERR_FSAL_NO_ERROR, 0);
301  	
302  		PTHREAD_RWLOCK_rdlock(&entry->attr_lock);
303  	
304  	again:
305  		(void)atomic_inc_uint64_t(&cache_stp->inode_mapping);
306  	
307  		glist_for_each(glist, &entry->export_list) {
308  			expmap = glist_entry(glist, struct entry_export_map,
309  					     export_per_entry);
310  	
311  			/* Found active export on list */
312  			if (expmap->exp == export) {
313  				PTHREAD_RWLOCK_unlock(&entry->attr_lock);
314  				return fsalstat(ERR_FSAL_NO_ERROR, 0);
315  			}
316  		}
317  	
318  		if (!try_write) {
319  			/* Now take write lock and try again in
320  			 * case another thread has raced with us.
321  			 */
322  			PTHREAD_RWLOCK_unlock(&entry->attr_lock);
323  			PTHREAD_RWLOCK_wrlock(&entry->attr_lock);
324  			try_write = true;
325  			goto again;
326  		}
327  	
328  		/* We have the write lock and did not find
329  		 * this export on the list, add it.
330  		 */
331  		PTHREAD_RWLOCK_wrlock(&export->mdc_exp_lock);
332  	
333  		/* Check for unexport again, this prevents an interlock issue where
334  		 * we passed above, but now unexport is in progress. This is required
335  		 * because the various locks are acquired, dropped, and re-acquired
336  		 * in such a way that unexport may have started after we made the
337  		 * check at the top.
338  		 */
339  		if (atomic_fetch_uint8_t(&export->flags) & MDC_UNEXPORT) {
340  			/* In the process of unexporting, don't allow creating a new
341  			 * export mapping. Return a stale error.
342  			 */
343  			PTHREAD_RWLOCK_unlock(&export->mdc_exp_lock);
344  			PTHREAD_RWLOCK_unlock(&entry->attr_lock);
345  			return fsalstat(ERR_FSAL_STALE, ESTALE);
346  		}
347  	
348  		expmap = gsh_calloc(1, sizeof(*expmap));
349  	
350  		/* If export_list is empty, store this export as first */
351  		if (glist_empty(&entry->export_list)) {
352  			atomic_store_int32_t(&entry->first_export_id,
353  					     (int32_t) op_ctx->ctx_export->export_id);
354  		}
355  	
356  		expmap->exp = export;
357  		expmap->entry = entry;
358  	
359  		glist_add_tail(&entry->export_list, &expmap->export_per_entry);
360  		glist_add_tail(&export->entry_list, &expmap->entry_per_export);
361  	
362  		PTHREAD_RWLOCK_unlock(&export->mdc_exp_lock);
363  		PTHREAD_RWLOCK_unlock(&entry->attr_lock);
364  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
365  	}
366  	
367  	/* entry's content_lock must be held in exclusive mode */
368  	fsal_status_t
369  	mdc_get_parent_handle(struct mdcache_fsal_export *export,
370  			      mdcache_entry_t *entry,
371  			      struct fsal_obj_handle *sub_parent)
372  	{
373  		char buf[NFS4_FHSIZE];
374  		struct gsh_buffdesc fh_desc = { buf, NFS4_FHSIZE };
375  		fsal_status_t status;
376  		int32_t expire_time_parent;
377  	
378  	#ifdef DEBUG_MDCACHE
379  		assert(entry->content_lock.__data.__cur_writer);
380  	#endif
381  	
382  		/* Get a wire handle that can be used with create_handle() */
383  		subcall_raw(export,
384  			    status = sub_parent->obj_ops->handle_to_wire(sub_parent,
385  						FSAL_DIGEST_NFSV4, &fh_desc)
386  			   );
387  		if (FSAL_IS_ERROR(status))
388  			return status;
389  	
390  		/* And store in the parent host-handle */
391  		mdcache_copy_fh(&entry->fsobj.fsdir.parent, &fh_desc);
392  	
393  		expire_time_parent = op_ctx->fsal_export->exp_ops.fs_expiretimeparent(
394  								op_ctx->fsal_export);
395  		if (expire_time_parent != -1)
396  			entry->fsobj.fsdir.parent_time = time(NULL) +
397  							 expire_time_parent;
398  		else
399  			entry->fsobj.fsdir.parent_time = 0;
400  	
401  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
402  	}
403  	
404  	/* entry's content_lock must not be held, this function will
405  	get the content_lock in exclusive mode */
406  	void
407  	mdc_get_parent(struct mdcache_fsal_export *export, mdcache_entry_t *entry,
408  		       struct gsh_buffdesc *parent_out)
409  	{
410  		struct fsal_obj_handle *sub_handle = NULL;
411  		fsal_status_t status;
412  	
413  		PTHREAD_RWLOCK_wrlock(&entry->content_lock);
414  	
415  		if (entry->obj_handle.type != DIRECTORY) {
416  			/* Parent pointer only for directories */
417  			goto out;
418  		}
419  	
420  		if (entry->fsobj.fsdir.parent.len != 0) {
421  			/* Already has a parent pointer */
422  			if (entry->fsobj.fsdir.parent_time == 0 ||
423  			    mdcache_is_parent_valid(entry)) {
424  				goto copy_parent_out;
425  			}
426  		}
427  	
428  		subcall_raw(export,
429  			status = entry->sub_handle->obj_ops->lookup(
430  				    entry->sub_handle, "..", &sub_handle, NULL)
431  		       );
432  	
433  		if (FSAL_IS_ERROR(status)) {
434  			/* Top of filesystem */
435  			goto copy_parent_out;
436  		}
437  	
438  		mdcache_free_fh(&entry->fsobj.fsdir.parent);
439  		mdc_get_parent_handle(export, entry, sub_handle);
440  	
441  	copy_parent_out:
442  		if (parent_out != NULL  && entry->fsobj.fsdir.parent.len != 0) {
443  			/* Copy the parent handle to parent_out */
444  			mdcache_copy_fh(parent_out, &entry->fsobj.fsdir.parent);
445  		}
446  	
447  	out:
448  		PTHREAD_RWLOCK_unlock(&entry->content_lock);
449  	
450  		if (sub_handle != NULL) {
451  			/* Release parent handle */
452  			subcall_raw(export,
453  				    sub_handle->obj_ops->release(sub_handle)
454  				   );
455  		}
456  	}
457  	
458  	/**
459  	 * @brief Cleans all the dirents belonging to a directory chunk.
460  	 *
461  	 * @note The content lock MUST be held for write
462  	 *
463  	 * @param[in,out] chunk  The chunk being cleaned.
464  	 *
465  	 */
466  	
467  	void mdcache_clean_dirent_chunk(struct dir_chunk *chunk)
468  	{
469  		struct glist_head *glist, *glistn;
470  		struct mdcache_fsal_obj_handle *parent = chunk->parent;
471  	
472  	#ifdef DEBUG_MDCACHE
473  		assert(parent->content_lock.__data.__cur_writer);
474  	#endif
475  	
476  		glist_for_each_safe(glist, glistn, &chunk->dirents) {
477  			mdcache_dir_entry_t *dirent;
478  	
479  			dirent = glist_entry(glist, mdcache_dir_entry_t, chunk_list);
480  	
481  			/* Remove from deleted or active names tree */
482  			mdcache_avl_remove(parent, dirent);
483  		}
484  	
485  		/* Remove chunk from directory. */
486  		glist_del(&chunk->chunks);
487  	
488  		/* At this point the following is true about the chunk:
489  		 *
490  		 * chunks is {NULL, NULL} do to the glist_del
491  		 * dirents is {&dirents, &dirents}, i.e. empty as a result of the
492  		 *                                  glist_for_each_safe above
493  		 * the other fields are untouched.
494  		 */
495  	
496  		/* This chunk is about to be freed or reused, clean up a few more
497  		 * things.
498  		 */
499  	
500  		chunk->parent = NULL;
501  		chunk->next_ck = 0;
502  		chunk->num_entries = 0;
503  	}
504  	
505  	/**
506  	 * @brief Cleans all the dirent chunks belonging to a directory.
507  	 *
508  	 * @note The content lock MUST be held for write
509  	 *
510  	 * @param[in,out] emtry  The directory being cleaned.
511  	 *
512  	 */
513  	
514  	void mdcache_clean_dirent_chunks(mdcache_entry_t *entry)
515  	{
516  		struct glist_head *glist, *glistn;
517  	
518  	#ifdef DEBUG_MDCACHE
519  		assert(entry->content_lock.__data.__cur_writer);
520  	#endif
521  		glist_for_each_safe(glist, glistn, &entry->fsobj.fsdir.chunks) {
522  			mdcache_lru_unref_chunk(glist_entry(glist, struct dir_chunk,
523  							    chunks));
524  		}
525  	}
526  	
527  	/**
528  	 * @brief Invalidates and releases all cached entries for a directory
529  	 *
530  	 * Invalidates all the entries for a cached directory.
531  	 *
532  	 * @note The content lock MUST be held for write
533  	 *
534  	 * @param[in,out] entry  The directory to be managed
535  	 *
536  	 */
537  	
538  	void mdcache_dirent_invalidate_all(mdcache_entry_t *entry)
539  	{
540  		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
541  				"Invalidating directory for %p, clearing MDCACHE_DIR_POPULATED setting MDCACHE_TRUST_CONTENT and MDCACHE_TRUST_DIR_CHUNKS",
542  				entry);
543  	
544  		/* Clean the chunks first, that will clean most of the active
545  		 * entries also.
546  		 */
547  		mdcache_clean_dirent_chunks(entry);
548  	
549  		/* Clean the active and deleted trees */
550  		mdcache_avl_clean_trees(entry);
551  	
552  		atomic_clear_uint32_t_bits(&entry->mde_flags, MDCACHE_DIR_POPULATED);
553  	
554  		atomic_set_uint32_t_bits(&entry->mde_flags, MDCACHE_TRUST_CONTENT |
555  							    MDCACHE_TRUST_DIR_CHUNKS);
556  	}
557  	
558  	/**
559  	 * @brief Adds a new entry to the cache
560  	 *
561  	 * This function adds a new entry to the cache.  It will allocate
562  	 * entries of any kind.
563  	 *
564  	 * The caller is responsible for releasing attrs_in, however, the references
565  	 * will have been transferred to the new mdcache entry. fsal_copy_attrs leaves
566  	 * the state of the source attributes still safe to call fsal_release_attrs,
567  	 * so all will be well.
568  	 *
569  	 * @param[in]     export         Export for this cache
570  	 * @param[in]     sub_handle     Handle for sub-FSAL
571  	 * @param[in]     attrs_in       Attributes provided for the object
572  	 * @param[in,out] attrs_out      Attributes requested for the object
573  	 * @param[in]     new_directory  Indicate a new directory was created
574  	 * @param[out]    entry          Newly instantiated cache entry
575  	 * @param[in]     state          Optional state_t representing open file.
576  	 *
577  	 * @note This returns an INITIAL ref'd entry on success
578  	 *
579  	 * @return FSAL status
580  	 */
581  	fsal_status_t
582  	mdcache_new_entry(struct mdcache_fsal_export *export,
583  			  struct fsal_obj_handle *sub_handle,
584  			  struct attrlist *attrs_in,
585  			  struct attrlist *attrs_out,
586  			  bool new_directory,
587  			  mdcache_entry_t **entry,
588  			  struct state_t *state,
589  			  mdc_reason_t reason)
590  	{
591  		fsal_status_t status;
592  		mdcache_entry_t *oentry, *nentry = NULL;
593  		struct gsh_buffdesc fh_desc;
594  		cih_latch_t latch;
595  		bool has_hashkey = false;
596  		int rc = 0;
597  		mdcache_key_t key;
598  	
599  		*entry = NULL;
600  	
601  		/* Get FSAL-specific key */
602  		subcall_raw(export,
603  			    sub_handle->obj_ops->handle_to_key(sub_handle, &fh_desc)
604  			   );
605  	
606  		(void) cih_hash_key(&key, export->mfe_exp.sub_export->fsal, &fh_desc,
607  				    CIH_HASH_KEY_PROTOTYPE);
608  	
609  		/* Check if the entry already exists.  We allow the following race
610  		 * because mdcache_lru_get has a slow path, and the latch is a
611  		 * shared lock. */
612  		status = mdcache_find_keyed(&key, entry);
613  		if (!FSAL_IS_ERROR(status)) {
614  			LogDebug(COMPONENT_CACHE_INODE,
615  				 "Trying to add an already existing entry. Found entry %p type: %d, New type: %d",
616  				 *entry, (*entry)->obj_handle.type, sub_handle->type);
617  	
618  			/* If it was unreachable before, mark it reachable */
619  			atomic_clear_uint32_t_bits(&(*entry)->mde_flags,
620  						 MDCACHE_UNREACHABLE);
621  	
622  			/* Don't need a new sub_handle ref */
623  			goto out_no_new_entry_yet;
624  		} else if (status.major != ERR_FSAL_NOENT) {
625  			/* Real error , don't need a new sub_handle ref */
626  			goto out_no_new_entry_yet;
627  		}
628  	
629  		/* !LATCHED */
630  	
631  		/* We did not find the object.  Pull an entry off the LRU. The entry
632  		 * will already be mapped.
633  		 */
634  		nentry = mdcache_alloc_handle(export, sub_handle, sub_handle->fs,
635  					      reason);
636  	
637  		if (nentry == NULL) {
638  			/* We didn't get an entry because of unexport in progress,
639  			 * go ahead and bail out now.
640  			 */
641  			status = fsalstat(ERR_FSAL_STALE, 0);
642  			goto out_no_new_entry_yet;
643  		}
644  	
645  		/* See if someone raced us. */
646  		oentry = cih_get_by_key_latch(&key, &latch, CIH_GET_WLOCK, __func__,
647  						__LINE__);
648  		if (oentry) {
649  			/* Entry is already in the cache, do not add it. */
650  			LogDebug(COMPONENT_CACHE_INODE,
651  				 "lost race to add entry %p type: %d, New type: %d",
652  				 oentry, oentry->obj_handle.type, sub_handle->type);
653  			*entry = oentry;
654  	
655  			/* Ref it */
656  			status = mdcache_lru_ref(*entry, LRU_REQ_INITIAL);
657  			if (!FSAL_IS_ERROR(status)) {
658  				/* We used to return ERR_FSAL_EXIST but all callers
659  				 * just converted that to ERR_FSAL_NO_ERROR, so
660  				 * leave the status alone.
661  				 */
662  				(void)atomic_inc_uint64_t(&cache_stp->inode_conf);
663  			}
664  	
665  			/* It it was unreachable before, mark it reachable */
666  			atomic_clear_uint32_t_bits(&(*entry)->mde_flags,
667  						 MDCACHE_UNREACHABLE);
668  	
669  			/* Release the subtree hash table lock */
670  			cih_hash_release(&latch);
671  	
672  			goto out_release_new_entry;
673  		}
674  	
675  		/* We won the race. */
676  	
677  		/* Set cache key */
678  	
679  		has_hashkey = cih_hash_key(&nentry->fh_hk.key,
680  					   export->mfe_exp.sub_export->fsal,
681  					   &fh_desc, CIH_HASH_NONE);
682  	
683  		if (!has_hashkey) {
684  			cih_hash_release(&latch);
685  			LogCrit(COMPONENT_CACHE_INODE,
686  				"Could not hash new entry");
687  			status = fsalstat(ERR_FSAL_NOMEM, 0);
688  			goto out_release_new_entry;
689  		}
690  	
691  		switch (nentry->obj_handle.type) {
692  		case REGULAR_FILE:
693  			LogDebug(COMPONENT_CACHE_INODE,
694  				 "Adding a REGULAR_FILE, entry=%p", nentry);
695  	
696  			/* Init statistics used for intelligently granting delegations*/
697  			init_deleg_heuristics(&nentry->obj_handle);
698  			break;
699  	
700  		case DIRECTORY:
701  			LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
702  				    "Adding a DIRECTORY, entry=%p setting MDCACHE_TRUST_CONTENT %s",
703  				    nentry, new_directory
704  						? "setting MDCACHE_DIR_POPULATED"
705  						: "clearing MDCACHE_DIR_POPULATED");
706  	
707  			atomic_set_uint32_t_bits(&nentry->mde_flags,
708  						 MDCACHE_TRUST_CONTENT);
709  	
710  			/* If the directory is newly created, it is empty.  Because
711  			   we know its content, we consider it read. */
712  			if (new_directory) {
713  				atomic_set_uint32_t_bits(&nentry->mde_flags,
714  							 MDCACHE_DIR_POPULATED);
715  			} else {
716  				atomic_clear_uint32_t_bits(&nentry->mde_flags,
717  							   MDCACHE_DIR_POPULATED);
718  			}
719  	
720  			break;
721  	
722  		case SYMBOLIC_LINK:
723  		case SOCKET_FILE:
724  		case FIFO_FILE:
725  		case BLOCK_FILE:
726  		case CHARACTER_FILE:
727  			LogDebug(COMPONENT_CACHE_INODE,
728  				 "Adding a special file of type %d entry=%p",
729  				 nentry->obj_handle.type, nentry);
730  			break;
731  	
732  		default:
733  			/* Should never happen */
734  			cih_hash_release(&latch);
735  			status = fsalstat(ERR_FSAL_INVAL, 0);
736  			LogMajor(COMPONENT_CACHE_INODE, "unknown type %u provided",
737  				 nentry->obj_handle.type);
738  			goto out_release_new_entry;
739  		}
740  	
741  		/* nentry not reachable yet; no need to lock */
742  	
743  		/* Copy over the attributes and pass off the ACL reference. We also
744  		 * copy the output attrs at this point to avoid needing the attr_lock.
745  		 */
746  		if (attrs_out != NULL)
747  			fsal_copy_attrs(attrs_out, attrs_in, false);
748  	
749  		/* Use the attrs_in request_mask because it will know if ACL was
750  		 * requested or not (anyone calling mdcache_new_entry will have
751  		 * requested all supported attributes including ACL).
752  		 */
753  		nentry->attrs.request_mask = attrs_in->request_mask;
754  		fsal_copy_attrs(&nentry->attrs, attrs_in, true);
755  	
756  		if (nentry->attrs.expire_time_attr == 0) {
757  			nentry->attrs.expire_time_attr =
758  			    op_ctx->export_perms->expire_time_attr;
759  		}
760  	
761  		/* Validate the attributes we just set. */
762  		mdc_fixup_md(nentry, &nentry->attrs);
763  	
764  		/* Hash and insert entry, after this would need attr_lock to
765  		 * access attributes.
766  		 */
767  		rc = cih_set_latched(nentry, &latch,
768  				     op_ctx->fsal_export->fsal, &fh_desc,
769  				     CIH_SET_UNLOCK | CIH_SET_HASHED);
770  		if (unlikely(rc)) {
771  			LogCrit(COMPONENT_CACHE_INODE,
772  				"entry could not be added to hash, rc=%d", rc);
773  			status = fsalstat(ERR_FSAL_NOMEM, 0);
774  			if (attrs_out != NULL) {
775  				/* Release the attrs we just copied. */
776  				fsal_release_attrs(attrs_out);
777  			}
778  			goto out_release_new_entry;
779  		}
780  	
781  		if (isFullDebug(COMPONENT_CACHE_INODE)) {
782  			char str[LOG_BUFF_LEN] = "\0";
783  			struct display_buffer dspbuf = {sizeof(str), str, str };
784  	
785  			(void) display_mdcache_key(&dspbuf, &nentry->fh_hk.key);
786  	
787  			LogFullDebug(COMPONENT_CACHE_INODE,
788  				     "New entry %p added with fh_hk.key %s",
789  				     nentry, str);
790  		} else {
791  			LogDebug(COMPONENT_CACHE_INODE, "New entry %p added", nentry);
792  		}
793  		mdcache_lru_insert(nentry, reason);
794  		*entry = nentry;
795  		(void)atomic_inc_uint64_t(&cache_stp->inode_added);
796  		return fsalstat(ERR_FSAL_NO_ERROR, 0);
797  	
798  	 out_release_new_entry:
799  	
800  		/* We raced or failed, release the new entry we acquired, this will
801  		 * result in inline deconstruction. This will release the attributes, we
802  		 * may not have copied yet, in which case mask and acl are 0/NULL.  This
803  		 * entry is not yet in the hash or LRU, so just put it's sentinal ref.
804  		 */
805  		nentry->sub_handle = NULL;
806  		mdcache_put(nentry);
807  		mdcache_put(nentry);
808  	
809  	 out_no_new_entry_yet:
810  	
811  		/* If attributes were requested, fetch them now if we still have a
812  		 * success return since we did not actually create a new object and
813  		 * use the provided attributes (we can't trust that the provided
814  		 * attributes are newer).
815  		 *
816  		 * NOTE: There can not be an ABBA lock ordering issue since our caller
817  		 *        does not hold a lock on the "new" entry.
818  		 */
819  		if (!FSAL_IS_ERROR(status) && attrs_out != NULL) {
820  			status = get_optional_attrs(&(*entry)->obj_handle,
821  						    attrs_out);
822  			if (FSAL_IS_ERROR(status)) {
823  				/* Oops, failed to get attributes and ATTR_RDATTR_ERR
824  				 * was not requested, so we are failing and thus must
825  				 * drop the object reference we got.
826  				 */
827  				mdcache_put(*entry);
828  				*entry = NULL;
829  			}
830  		}
831  	
832  		if (!FSAL_IS_ERROR(status)) {
833  			/* Give the FSAL a chance to merge new_obj into
834  			 * oentry->obj_handle since we will be using
835  			 * oentry->obj_handle for all access to the oject.
836  			 */
837  			struct fsal_obj_handle *old_sub_handle = (*entry)->sub_handle;
838  	
839  			subcall_raw(export,
840  				    status =
841  				    old_sub_handle->obj_ops->merge(old_sub_handle,
842  								  sub_handle)
843  				   );
844  	
845  			if (FSAL_IS_ERROR(status)) {
846  				/* Report this error and unref the entry */
847  				LogDebug(COMPONENT_CACHE_INODE,
848  					 "Merge of object handles after race returned %s",
849  					 fsal_err_txt(status));
850  	
851  				mdcache_put(*entry);
852  				*entry = NULL;
853  			}
854  		}
855  	
856  		if (FSAL_IS_ERROR(status) && state != NULL) {
857  			/* Our caller passed in a state for an open file, since
858  			 * there is not a valid entry to use, or a merge failed
859  			 * we must close that file before disposing of new_obj.
860  			 */
861  			fsal_status_t cstatus;
862  	
863  			subcall_raw(export,
864  				    cstatus = sub_handle->obj_ops->close2(sub_handle,
865  									 state)
866  				   );
867  	
868  			LogDebug(COMPONENT_CACHE_INODE,
869  				 "Close of state during error processing returned %s",
870  				 fsal_err_txt(cstatus));
871  		}
872  	
873  		/* must free sub_handle if no new entry was created to reference it. */
874  		subcall_raw(export,
875  			    sub_handle->obj_ops->release(sub_handle)
876  			   );
877  	
878  		return status;
879  	}
880  	
881  	int display_mdcache_key(struct display_buffer *dspbuf, mdcache_key_t *key)
882  	{
883  		int b_left = display_printf(dspbuf, "hk=%"PRIx64" fsal=%p key=",
884  					    key->hk, key->fsal);
885  	
886  		if (b_left <= 0)
887  			return b_left;
888  	
889  		return display_opaque_bytes(dspbuf, key->kv.addr, key->kv.len);
890  	}
891  	
892  	/**
893  	 * @brief Find a cache entry by it's key
894  	 *
895  	 * Lookup a cache entry by key.  If it is not in the cache, it is not returned.
896  	 *
897  	 * @param[in] key	Cache key to use for lookup
898  	 * @param[out] entry	Entry, if found
899  	 * @param[in] reason	The reason for the lookup
900  	 *
901  	 * @note This returns ref'd entry on success, INITIAL if @a reason is not SCAN
902  	 *
903  	 * @return Status
904  	 */
905  	fsal_status_t
906  	mdcache_find_keyed_reason(mdcache_key_t *key, mdcache_entry_t **entry,
907  				  mdc_reason_t reason)
908  	{
909  		cih_latch_t latch;
910  	
911  		if (key->kv.addr == NULL) {
912  			LogDebug(COMPONENT_CACHE_INODE,
913  				 "Attempt to use NULL key");
914  			return fsalstat(ERR_FSAL_INVAL, 0);
915  		}
916  	
917  		if (isFullDebug(COMPONENT_CACHE_INODE)) {
918  			char str[LOG_BUFF_LEN] = "\0";
919  			struct display_buffer dspbuf = { sizeof(str), str, str };
920  	
921  			(void) display_mdcache_key(&dspbuf, key);
922  	
923  			LogFullDebug(COMPONENT_CACHE_INODE,
924  				     "Looking for %s", str);
925  		}
926  	
927  		*entry = cih_get_by_key_latch(key, &latch,
928  						CIH_GET_RLOCK | CIH_GET_UNLOCK_ON_MISS,
929  						__func__, __LINE__);
930  		if (likely(*entry)) {
931  			fsal_status_t status;
932  	
933  			/* Initial Ref on entry */
934  			status = mdcache_lru_ref(*entry, (reason != MDC_REASON_SCAN) ?
935  						 LRU_REQ_INITIAL : LRU_FLAG_NONE);
936  			/* Release the subtree hash table lock */
937  			cih_hash_release(&latch);
938  			if (FSAL_IS_ERROR(status)) {
939  				/* Return error instead of entry */
940  				LogFullDebug(COMPONENT_CACHE_INODE,
941  					     "Found entry %p, but could not ref error %s",
942  					     entry, fsal_err_txt(status));
943  	
944  				*entry = NULL;
945  				return status;
946  			}
947  	
948  			status = mdc_check_mapping(*entry);
949  	
950  			if (unlikely(FSAL_IS_ERROR(status))) {
951  				/* Export is in the process of being removed, don't
952  				 * add this entry to the export, and bail out of the
953  				 * operation sooner than later.
954  				 */
955  				mdcache_put(*entry);
956  				*entry = NULL;
957  				return status;
958  			}
959  	
960  			LogFullDebug(COMPONENT_CACHE_INODE,
961  				     "Found entry %p",
962  				     *entry);
963  	
964  			(void)atomic_inc_uint64_t(&cache_stp->inode_hit);
965  	
966  			return fsalstat(ERR_FSAL_NO_ERROR, 0);
967  		}
968  	
969  		return fsalstat(ERR_FSAL_NOENT, 0);
970  	}
971  	
972  	/**
973  	 * @brief Find or create a cache entry by it's host-handle
974  	 *
975  	 * Locate a cache entry by host-handle.  If it is not in the cache, an attempt
976  	 * will be made to create it and insert it in the cache.
977  	 *
978  	 * @param[in]     key       Cache key to use for lookup
979  	 * @param[in]     export    Export for this cache
980  	 * @param[out]    entry     Entry, if found
981  	 * @param[in,out] attrs_out Optional attributes for newly created object
982  	 *
983  	 * @note This returns an INITIAL ref'd entry on success
984  	 *
985  	 * @return Status
986  	 */
987  	fsal_status_t
988  	mdcache_locate_host(struct gsh_buffdesc *fh_desc,
989  			    struct mdcache_fsal_export *export,
990  			    mdcache_entry_t **entry,
991  			    struct attrlist *attrs_out)
992  	{
993  		struct fsal_export *sub_export = export->mfe_exp.sub_export;
994  		mdcache_key_t key;
995  		struct fsal_obj_handle *sub_handle;
996  		struct attrlist attrs;
997  		fsal_status_t status;
998  	
999  		/* Copy the fh_desc into key, todo: is there a function for this? */
1000 		/* We want to save fh_desc */
1001 		key.kv.len = fh_desc->len;
1002 		key.kv.addr = alloca(key.kv.len);
1003 		memcpy(key.kv.addr, fh_desc->addr, key.kv.len);
1004 		subcall_raw(export,
1005 			    status = sub_export->exp_ops.host_to_key(sub_export,
1006 								     &key.kv)
1007 		       );
1008 	
1009 		if (FSAL_IS_ERROR(status))
1010 			return status;
1011 	
1012 		(void)cih_hash_key(&key, sub_export->fsal, &key.kv,
1013 				    CIH_HASH_KEY_PROTOTYPE);
1014 	
1015 	
1016 		status = mdcache_find_keyed(&key, entry);
1017 	
1018 		if (!FSAL_IS_ERROR(status)) {
1019 			status = get_optional_attrs(&(*entry)->obj_handle, attrs_out);
1020 			return status;
1021 		} else if (status.major != ERR_FSAL_NOENT) {
1022 			/* Actual error */
1023 			return status;
1024 		}
1025 	
1026 		/* Ask for all supported attributes except ACL (we defer fetching ACL
1027 		 * until asked for it (including a permission check).
1028 		 */
1029 		fsal_prepare_attrs(&attrs,
1030 				   op_ctx->fsal_export->exp_ops.fs_supported_attrs(
1031 						op_ctx->fsal_export) & ~ATTR_ACL);
1032 	
1033 		sub_export = export->mfe_exp.sub_export;
1034 	
1035 		subcall_raw(export,
1036 			    status = sub_export->exp_ops.create_handle(sub_export,
1037 								       fh_desc,
1038 								       &sub_handle,
1039 								       &attrs)
1040 		       );
1041 	
1042 		if (unlikely(FSAL_IS_ERROR(status))) {
1043 			LogDebug(COMPONENT_CACHE_INODE,
1044 				 "create_handle failed with %s",
1045 				 fsal_err_txt(status));
1046 			*entry = NULL;
1047 			fsal_release_attrs(&attrs);
1048 			return status;
1049 		}
1050 	
1051 		status = mdcache_new_entry(export, sub_handle, &attrs, attrs_out,
1052 					   false, entry, NULL, MDC_REASON_DEFAULT);
1053 	
1054 		fsal_release_attrs(&attrs);
1055 	
1056 		if (!FSAL_IS_ERROR(status)) {
1057 			LogFullDebug(COMPONENT_CACHE_INODE,
1058 				     "create_handle Created entry %p FSAL %s",
1059 				     (*entry), (*entry)->sub_handle->fsal->name);
1060 		}
1061 	
1062 		return status;
1063 	}
1064 	
1065 	/**
1066 	 * @brief Try to get a cached child
1067 	 *
1068 	 * Get the cached entry child of @a mdc_parent If the cached entry cannot be
1069 	 * found, for whatever reason, return ERR_FSAL_STALE
1070 	 *
1071 	 * @note Caller MUST hold the content_lock for read
1072 	 *
1073 	 * @param[in]     mdc_parent     Parent directory
1074 	 * @param[in]     name           Name of child
1075 	 * @param[out]    entry	         Child entry, on success
1076 	 *
1077 	 * @note This returns an INITIAL ref'd entry on success
1078 	 * @return FSAL status
1079 	 */
1080 	fsal_status_t mdc_try_get_cached(mdcache_entry_t *mdc_parent,
1081 					 const char *name,
1082 					 mdcache_entry_t **entry)
1083 	{
1084 		mdcache_dir_entry_t *dirent = NULL;
1085 		fsal_status_t status = {0, 0};
1086 	
1087 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1088 				"Look in cache %s, trust content %s",
1089 				name,
1090 				test_mde_flags(mdc_parent, MDCACHE_TRUST_CONTENT)
1091 					? "yes" : "no");
1092 	
1093 	#ifdef DEBUG_MDCACHE
1094 		assert(mdc_parent->content_lock.__data.__readers ||
1095 		       mdc_parent->content_lock.__data.__cur_writer);
1096 	#endif
1097 		*entry = NULL;
1098 	
1099 		/* If aren't caching dirents, return stale */
1100 		if (mdcache_param.dir.avl_chunk == 0)
1101 			return fsalstat(ERR_FSAL_STALE, 0);
1102 	
1103 		/* If the dirent cache is untrustworthy, don't even ask it */
1104 		if (!test_mde_flags(mdc_parent, MDCACHE_TRUST_CONTENT))
1105 			return fsalstat(ERR_FSAL_STALE, 0);
1106 	
1107 		dirent = mdcache_avl_lookup(mdc_parent, name);
1108 		if (dirent) {
1109 			if (dirent->chunk != NULL) {
1110 				/* Bump the chunk in the LRU */
1111 				lru_bump_chunk(dirent->chunk);
1112 			} else {
1113 				/* Bump the detached dirent. */
1114 				bump_detached_dirent(mdc_parent, dirent);
1115 			}
1116 			status = mdcache_find_keyed(&dirent->ckey, entry);
1117 			if (!FSAL_IS_ERROR(status))
1118 				return status;
1119 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1120 					"mdcache_find_keyed %s failed %s",
1121 					name, fsal_err_txt(status));
1122 		} else {	/* ! dirent */
1123 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1124 					"mdcache_avl_lookup %s failed trust negative %s",
1125 					name,
1126 					trust_negative_cache(mdc_parent)
1127 						? "yes" : "no");
1128 			if (trust_negative_cache(mdc_parent)) {
1129 				/* If the dirent cache is both fully populated and
1130 				 * valid, it can serve negative lookups. */
1131 				return fsalstat(ERR_FSAL_NOENT, 0);
1132 			}
1133 		}
1134 		return fsalstat(ERR_FSAL_STALE, 0);
1135 	}
1136 	
1137 	/**
1138 	 * @brief Lookup a name (helper)
1139 	 *
1140 	 * Lookup a name relative to another object.  If @a uncached is true and a cache
1141 	 * miss occurs, then the underlying file is looked up and added to the cache, if
1142 	 * it exists.
1143 	 *
1144 	 * The caller will set the request_mask in attrs_out to indicate the attributes
1145 	 * of interest. ATTR_ACL SHOULD NOT be requested and need not be provided. If
1146 	 * not all the requested attributes can be provided, this method MUST return
1147 	 * an error unless the ATTR_RDATTR_ERR bit was set in the request_mask.
1148 	 *
1149 	 * Since this method instantiates a new fsal_obj_handle, it will be forced
1150 	 * to fetch at least some attributes in order to even know what the object
1151 	 * type is (as well as it's fileid and fsid). For this reason, the operation
1152 	 * as a whole can be expected to fail if the attributes were not able to be
1153 	 * fetched.
1154 	 *
1155 	 * @param[in]     parent    Handle of container
1156 	 * @param[in]     name      Name to look up
1157 	 * @param[in]     uncached  If true, do an uncached lookup on cache failure
1158 	 * @param[out]    handle    Handle of found object, on success
1159 	 * @param[in,out] attrs_out Optional attributes for newly created object
1160 	 *
1161 	 * @note This returns an INITIAL ref'd entry on success
1162 	 * @return FSAL status
1163 	 */
1164 	fsal_status_t mdc_lookup(mdcache_entry_t *mdc_parent, const char *name,
1165 				 bool uncached, mdcache_entry_t **new_entry,
1166 				 struct attrlist *attrs_out)
1167 	{
1168 		*new_entry = NULL;
1169 		fsal_status_t status;
1170 	
1171 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1172 				"Lookup %s", name);
1173 	
1174 		/* ".." doesn't end up in the cache */
1175 		if (!strcmp(name, "..")) {
1176 			struct mdcache_fsal_export *export = mdc_cur_export();
1177 			struct gsh_buffdesc tmpfh;
1178 	
1179 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1180 					"Lookup parent (..) of %p", mdc_parent);
1181 	
1182 			mdc_get_parent(export, mdc_parent, &tmpfh);
1183 	
1184 			status =  mdcache_locate_host(&tmpfh, export, new_entry,
1185 						      attrs_out);
1186 	
1187 			mdcache_free_fh(&tmpfh);
1188 	
1189 			if (status.major == ERR_FSAL_STALE)
1190 				status.major = ERR_FSAL_NOENT;
1191 			return status;
1192 		}
1193 	
1194 		PTHREAD_RWLOCK_rdlock(&mdc_parent->content_lock);
1195 	
1196 		if (mdcache_param.dir.avl_chunk == 0) {
1197 			/* We aren't caching dirents; call directly.
1198 			 * NOTE: Technically we will call mdc_lookup_uncached not
1199 			 *       holding the content_lock write as required, however
1200 			 *       since we are operating uncached here, ultimately there
1201 			 *       will be no addition to the dirent cache, and thus no
1202 			 *       need to hold the write lock.
1203 			 */
1204 			goto uncached;
1205 		}
1206 	
1207 		/* We first try avltree_lookup by name.  If that fails, we dispatch to
1208 		 * the FSAL. */
1209 		status = mdc_try_get_cached(mdc_parent, name, new_entry);
1210 	
1211 		if (status.major == ERR_FSAL_STALE) {
1212 			/* Get a write lock and try again */
1213 			PTHREAD_RWLOCK_unlock(&mdc_parent->content_lock);
1214 	
1215 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1216 					"Try again %s", name);
1217 	
1218 			PTHREAD_RWLOCK_wrlock(&mdc_parent->content_lock);
1219 	
1220 			status = mdc_try_get_cached(mdc_parent, name, new_entry);
1221 		}
1222 		if (!FSAL_IS_ERROR(status)) {
1223 			/* Success! Now fetch attr if requested, drop content_lock
1224 			 * to avoid ABBA locking situation.
1225 			 */
1226 			PTHREAD_RWLOCK_unlock(&mdc_parent->content_lock);
1227 	
1228 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1229 					"Found, possible getattrs %s (%s)",
1230 					name, attrs_out != NULL ? "yes" : "no");
1231 	
1232 			status = get_optional_attrs(&(*new_entry)->obj_handle,
1233 						    attrs_out);
1234 	
1235 			if (FSAL_IS_ERROR(status)) {
1236 				/* Oops, failed to get attributes and ATTR_RDATTR_ERR
1237 				 * was not requested, so we are failing lookup and
1238 				 * thus must drop the object reference we got.
1239 				 */
1240 				mdcache_put(*new_entry);
1241 				*new_entry = NULL;
1242 			}
1243 			return status;
1244 		} else if (!uncached) {
1245 			/* Was only looking in cache, so don't bother looking further */
1246 			goto out;
1247 		} else if (status.major != ERR_FSAL_STALE) {
1248 			/* Actual failure */
1249 			LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1250 				    "Lookup %s failed %s",
1251 				    name, fsal_err_txt(status));
1252 			goto out;
1253 		}
1254 	
1255 		/* Need to look up. */
1256 		if (!test_mde_flags(mdc_parent, MDCACHE_TRUST_CONTENT)) {
1257 			/* We have the write lock and the content is
1258 			 * still invalid.  Empty it out and mark it
1259 			 * valid in preparation for caching the result of this lookup.
1260 			 */
1261 			mdcache_dirent_invalidate_all(mdc_parent);
1262 		}
1263 	
1264 		LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1265 			    "Cache Miss detected for %s", name);
1266 	
1267 	uncached:
1268 		status = mdc_lookup_uncached(mdc_parent, name, new_entry, attrs_out);
1269 	
1270 	out:
1271 		PTHREAD_RWLOCK_unlock(&mdc_parent->content_lock);
1272 		if (status.major == ERR_FSAL_STALE)
1273 			status.major = ERR_FSAL_NOENT;
1274 		return status;
1275 	}
1276 	
1277 	/**
1278 	 * @brief Lookup an uncached entry from the sub-FSAL
1279 	 *
1280 	 * The entry has already been determined to not be cached, and the parent is
1281 	 * already write-locked.  Lookup the child and create a cached entry for it.
1282 	 *
1283 	 * @note mdc_parent MUST have it's content_lock held for writing
1284 	 *
1285 	 * @param[in]     mdc_parent	Parent entry
1286 	 * @param[in]     name		Name of entry to find
1287 	 * @param[out]    new_entry	New entry to return;
1288 	 * @param[in,out] attrs_out     Optional attributes for entry
1289 	 *
1290 	 * @note This returns an INITIAL ref'd entry on success
1291 	 *
1292 	 * @return FSAL status
1293 	 */
1294 	fsal_status_t mdc_lookup_uncached(mdcache_entry_t *mdc_parent,
1295 					  const char *name,
1296 					  mdcache_entry_t **new_entry,
1297 					  struct attrlist *attrs_out)
1298 	{
1299 		struct fsal_obj_handle *sub_handle = NULL, *new_obj = NULL;
1300 		fsal_status_t status;
1301 		struct mdcache_fsal_export *export = mdc_cur_export();
1302 		struct attrlist attrs;
1303 		bool invalidate = false;
1304 	
1305 		/* Ask for all supported attributes except ACL (we defer fetching ACL
1306 		 * until asked for it (including a permission check).
1307 		 */
1308 		fsal_prepare_attrs(&attrs,
1309 				   op_ctx->fsal_export->exp_ops.fs_supported_attrs(
1310 						op_ctx->fsal_export) & ~ATTR_ACL);
1311 	
1312 		subcall(
1313 			status = mdc_parent->sub_handle->obj_ops->lookup(
1314 				    mdc_parent->sub_handle, name, &sub_handle, &attrs)
1315 		       );
1316 	
1317 		if (unlikely(FSAL_IS_ERROR(status))) {
1318 			LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1319 				    "lookup %s failed with %s",
1320 				    name, fsal_err_txt(status));
1321 			*new_entry = NULL;
1322 			fsal_release_attrs(&attrs);
1323 			return status;
1324 		}
1325 	
1326 		/* We are only called to fill cache, we should not need to invalidate
1327 		 * parents attributes (or dirents if chunked).
1328 		 *
1329 		 * NOTE: This does mean that a pure lookup of a file that had been added
1330 		 *       external to this Ganesha instance could cause us to not dump
1331 		 *       the dirent cache, however, that should still result in an
1332 		 *       attribute change which should dump the cache.
1333 		 */
1334 		status = mdcache_alloc_and_check_handle(export, sub_handle, &new_obj,
1335 							false, &attrs, attrs_out,
1336 							"lookup ", mdc_parent, name,
1337 							&invalidate, NULL);
1338 	
1339 		fsal_release_attrs(&attrs);
1340 	
1341 		if (FSAL_IS_ERROR(status)) {
1342 			*new_entry = NULL;
1343 		} else {
1344 			*new_entry = container_of(new_obj, mdcache_entry_t, obj_handle);
1345 		}
1346 	
1347 		return status;
1348 	}
1349 	
1350 	/**
1351 	 * @brief Lock two directories in order
1352 	 *
1353 	 * This function gets the locks on both entries. If src and dest are
1354 	 * the same, it takes only one lock.  Locks are acquired with lowest
1355 	 * cache_entry first to avoid deadlocks.
1356 	 *
1357 	 * @param[in] src  Source directory to lock
1358 	 * @param[in] dest Destination directory to lock
1359 	 */
1360 	
1361 	void
1362 	mdcache_src_dest_lock(mdcache_entry_t *src, mdcache_entry_t *dest)
1363 	{
1364 		int rc;
1365 	
1366 		/*
1367 		 * A problem found in this order
1368 		 * 1. mdcache_readdir holds A's content_lock, and tries to
1369 		 * grab B's attr_lock.
1370 		 * 2. mdcache_remove holds B's attr_lock, and tries to grab B's
1371 		 * content_lock
1372 		 * 3. mdcache_rename holds B's content_lock, and tries to grab the
1373 		 * A's content_lock (which is held by thread 1).
1374 		 * This change is to avoid this deadlock.
1375 		 */
1376 	
1377 	retry_lock:
1378 		if (src == dest)
1379 			PTHREAD_RWLOCK_wrlock(&src->content_lock);
1380 		else if (src < dest) {
1381 			PTHREAD_RWLOCK_wrlock(&src->content_lock);
1382 			rc = pthread_rwlock_trywrlock(&dest->content_lock);
1383 			if (rc) {
1384 				LogDebugAlt(COMPONENT_NFS_READDIR,
1385 					    COMPONENT_CACHE_INODE,
1386 					    "retry dest %p lock, src %p",
1387 					    dest, src);
1388 				PTHREAD_RWLOCK_unlock(&src->content_lock);
1389 				sleep(1);
1390 				goto retry_lock;
1391 			}
1392 		} else {
1393 			PTHREAD_RWLOCK_wrlock(&dest->content_lock);
1394 			rc = pthread_rwlock_trywrlock(&src->content_lock);
1395 			if (rc) {
1396 				LogDebugAlt(COMPONENT_NFS_READDIR,
1397 					    COMPONENT_CACHE_INODE,
1398 					    "retry src %p lock, dest %p",
1399 					    src, dest);
1400 				PTHREAD_RWLOCK_unlock(&dest->content_lock);
1401 				sleep(1);
1402 				goto retry_lock;
1403 			}
1404 		}
1405 	}
1406 	
1407 	/**
1408 	 * @brief Unlock two directories in order
1409 	 *
1410 	 * This function releases the locks on both entries. If src and dest
1411 	 * are the same, it releases the lock and returns.  Locks are released
1412 	 * with lowest cache_entry first.
1413 	 *
1414 	 * @param[in] src  Source directory to lock
1415 	 * @param[in] dest Destination directory to lock
1416 	 */
1417 	
1418 	void
1419 	mdcache_src_dest_unlock(mdcache_entry_t *src, mdcache_entry_t *dest)
1420 	{
1421 		if (src == dest)
1422 			PTHREAD_RWLOCK_unlock(&src->content_lock);
1423 		else if (src < dest) {
1424 			PTHREAD_RWLOCK_unlock(&dest->content_lock);
1425 			PTHREAD_RWLOCK_unlock(&src->content_lock);
1426 		} else {
1427 			PTHREAD_RWLOCK_unlock(&src->content_lock);
1428 			PTHREAD_RWLOCK_unlock(&dest->content_lock);
1429 		}
1430 	}
1431 	
1432 	/**
1433 	 *
1434 	 * @brief Adds a directory entry to a cached directory.
1435 	 *
1436 	 * This function adds a new directory entry to a directory.  Directory
1437 	 * entries have only weak references, so they do not prevent recycling
1438 	 * or freeing the entry they locate.  This function may be called
1439 	 * either once (for handling creation) or iteratively in directory
1440 	 * population.
1441 	 *
1442 	 * @note Caller MUST hold the content_lock for write and must only call if
1443 	 *       dirents are being cached.
1444 	 *
1445 	 * @param[in,out] parent      Cache entry of the directory being updated
1446 	 * @param[in]     name        The name to add to the entry
1447 	 * @param[in]     entry       The cache entry associated with name
1448 	 * @param[in,out] invalidate  Invalidate the parent directory contents if
1449 	 *                            adding to a chunk fails, if adding to a chunk
1450 	 *                            succeeds, invalidate will be reset to false
1451 	 *                            and the caller MUST refresh the attributes
1452 	 *                            without invalidating the dirent cache.
1453 	 *
1454 	 * @return FSAL status
1455 	 */
1456 	
1457 	fsal_status_t
1458 	mdcache_dirent_add(mdcache_entry_t *parent, const char *name,
1459 			   mdcache_entry_t *entry, bool *invalidate)
1460 	{
1461 		mdcache_dir_entry_t *new_dir_entry, *allocated_dir_entry;
1462 		size_t namesize = strlen(name) + 1;
1463 		int code = 0;
1464 	
1465 		LogFullDebug(COMPONENT_CACHE_INODE, "Add dir entry %s", name);
1466 	
1467 		if (name[0] == '\0') {
1468 			/* An empty dirent name is invalid */
1469 			LogInfo(COMPONENT_CACHE_INODE,
1470 				"Invalid dirent with empty name");
1471 			return fsalstat(ERR_FSAL_INVAL, 0);
1472 		}
1473 	
1474 	#ifdef DEBUG_MDCACHE
1475 		assert(parent->content_lock.__data.__cur_writer);
1476 	#endif
1477 	
1478 		/* in cache avl, we always insert on pentry_parent */
1479 		new_dir_entry = gsh_calloc(1, sizeof(mdcache_dir_entry_t) + namesize);
1480 		new_dir_entry->flags = DIR_ENTRY_FLAG_NONE;
1481 		allocated_dir_entry = new_dir_entry;
1482 	
1483 		memcpy(&new_dir_entry->name_buffer, name, namesize);
1484 		new_dir_entry->name = new_dir_entry->name_buffer;
1485 		mdcache_key_dup(&new_dir_entry->ckey, &entry->fh_hk.key);
1486 	
1487 		/* add to avl */
1488 		code = mdcache_avl_insert(parent, &new_dir_entry);
1489 		if (code < 0) {
1490 			/** @todo: maybe we should actually invalidate the dirent cache
1491 			 *         at this point?
1492 			 *
1493 			 * This indicates an odd condition in the tree, just treat
1494 			 * as an EEXIST condition.
1495 			 */
1496 			LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1497 				    "Returning EEXIST for %s code %d", name, code);
1498 			return fsalstat(ERR_FSAL_EXIST, 0);
1499 		}
1500 	
1501 		/* we're going to succeed */
1502 		if (new_dir_entry == allocated_dir_entry) {
1503 			/* Place new dirent into a chunk or as detached. */
1504 			place_new_dirent(parent, new_dir_entry);
1505 	
1506 			/* Since we are chunking, we can preserve the dirent cache for
1507 			 * the purposes of lookups even if we could not add the new
1508 			 * dirent to a chunk, so we don't want to invalidate the parent
1509 			 * directory.
1510 			 */
1511 			*invalidate = false;
1512 		}
1513 	
1514 		return fsalstat(ERR_FSAL_NO_ERROR, 0);
1515 	}
1516 	
1517 	/**
1518 	 * @brief Remove a cached directory entry
1519 	 *
1520 	 * @note Caller MUST hold the content_lock for write
1521 	 *
1522 	 * @param[in] parent	Parent directory
1523 	 * @param[in] name	Name to remove
1524 	 */
1525 	void mdcache_dirent_remove(mdcache_entry_t *parent, const char *name)
1526 	{
1527 	#ifdef DEBUG_MDCACHE
1528 		assert(parent->content_lock.__data.__cur_writer);
1529 	#endif
1530 		/* Don't remove if we aren't doing dirent caching or the cache is empty
1531 		 */
1532 		if (mdcache_param.dir.avl_chunk != 0 &&
1533 		    avltree_size(&parent->fsobj.fsdir.avl.t) != 0) {
1534 			mdcache_dir_entry_t *dirent;
1535 	
1536 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1537 					"Remove dir entry %s", name);
1538 	
1539 			dirent = mdcache_avl_lookup(parent, name);
1540 	
1541 			if (dirent != NULL)
1542 				avl_dirent_set_deleted(parent, dirent);
1543 		}
1544 	}
1545 	
1546 	/**
1547 	 * @brief State to be passed to FSAL readdir callbacks
1548 	 */
1549 	
1550 	struct mdcache_populate_cb_state {
1551 		struct mdcache_fsal_export *export;
1552 		mdcache_entry_t *dir;
1553 		fsal_status_t *status;
1554 		fsal_readdir_cb cb;
1555 		void *dir_state; /**< For unchunked */
1556 		/** First chunk of this cycle */
1557 		struct dir_chunk *first_chunk;
1558 		/** Current chunk of this cycle */
1559 		struct dir_chunk *cur_chunk;
1560 		/** Chunk previous to cur_chunk, if known */
1561 		struct dir_chunk *prev_chunk;
1562 		/** dirent to be filled in when whence_is_name */
1563 		mdcache_dir_entry_t **dirent;
1564 		/** Cookie is what we are actually searching for */
1565 		fsal_cookie_t cookie;
1566 		/** Indicates if FSAL expects whence to be a name. */
1567 		bool whence_is_name;
1568 		/** If whence_is_name, indicate if we are looking for caller's cookie.
1569 		 */
1570 		bool whence_search;
1571 	};
1572 	
1573 	/**
1574 	 * @brief Handle a readdir callback on uncache dir
1575 	 *
1576 	 * Cache a sindle object, passing it up the stack to the caller.  This is for
1577 	 * handling readdir on a directory that is not being cached, for example because
1578 	 * is is too big.  Dirents are not created by this callback, just objects.
1579 	 *
1580 	 * @param[in]     name       Name of the directory entry
1581 	 * @param[in]     sub_handle Object for entry
1582 	 * @param[in]     attrs      Attributes requested for the object
1583 	 * @param[in,out] dir_state  Callback state
1584 	 * @param[in]     cookie     Directory cookie
1585 	 *
1586 	 * @returns fsal_dir_result
1587 	 */
1588 	
1589 	static enum fsal_dir_result
1590 	mdc_readdir_uncached_cb(const char *name, struct fsal_obj_handle *sub_handle,
1591 				struct attrlist *attrs, void *dir_state,
1592 				fsal_cookie_t cookie)
1593 	{
1594 		struct mdcache_populate_cb_state *state = dir_state;
1595 		fsal_status_t status = { 0, 0 };
1596 		mdcache_entry_t *directory = container_of(&state->dir->obj_handle,
1597 							  mdcache_entry_t, obj_handle);
1598 		mdcache_entry_t *new_entry = NULL;
1599 		enum fsal_dir_result rv;
1600 	
1601 		/* This is in the middle of a subcall. Do a supercall */
1602 		supercall_raw(state->export,
1603 			status = mdcache_new_entry(state->export, sub_handle, attrs,
1604 						   NULL, false, &new_entry, NULL,
1605 						   MDC_REASON_SCAN)
1606 		);
1607 	
1608 		if (FSAL_IS_ERROR(status)) {
1609 			*state->status = status;
1610 			if (status.major == ERR_FSAL_XDEV) {
1611 				LogInfoAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1612 					"Ignoring XDEV entry %s", name);
1613 				*state->status = fsalstat(ERR_FSAL_NO_ERROR, 0);
1614 				return DIR_CONTINUE;
1615 			}
1616 			LogInfoAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1617 				   "Lookup failed on %s in dir %p with %s",
1618 				   name, directory, fsal_err_txt(*state->status));
1619 			return DIR_TERMINATE;
1620 		}
1621 	
1622 		/* Call up the stack.  Do a supercall */
1623 		supercall_raw(state->export,
1624 			      rv = state->cb(name, &new_entry->obj_handle,
1625 					     &new_entry->attrs, state->dir_state,
1626 					     cookie));
1627 	
1628 		return rv;
1629 	}
1630 	
1631 	/**
1632 	 * Perform an uncached readdir
1633 	 *
1634 	 * Large directories do not have their dirents cached.  This performs readdir on
1635 	 * such directories, by passing the sub-FSAL's results back up through the
1636 	 * stack.
1637 	 *
1638 	 * @note The object passed into the callback is ref'd and must be unref'd by the
1639 	 * callback.
1640 	 *
1641 	 * @param[in] directory the directory to read
1642 	 * @param[in] whence where to start (next)
1643 	 * @param[in] dir_state pass thru of state to callback
1644 	 * @param[in] cb callback function
1645 	 * @param[in] attrmask Which attributes to fill
1646 	 * @param[out] eod_met eod marker true == end of dir
1647 	 *
1648 	 * @return FSAL status
1649 	 */
1650 	fsal_status_t
1651 	mdcache_readdir_uncached(mdcache_entry_t *directory, fsal_cookie_t *whence,
1652 				 void *dir_state, fsal_readdir_cb cb,
1653 				 attrmask_t attrmask, bool *eod_met)
1654 	{
1655 		fsal_status_t status = {0, 0};
1656 		fsal_status_t readdir_status = {0, 0};
1657 		struct mdcache_populate_cb_state state;
1658 	
1659 		state.export = mdc_cur_export();
1660 		state.dir = directory;
1661 		state.status = &status;
1662 		state.cb = cb;
1663 		state.dir_state = dir_state;
1664 	
1665 		subcall(
1666 			readdir_status = directory->sub_handle->obj_ops->readdir(
1667 				directory->sub_handle, whence, &state,
1668 				mdc_readdir_uncached_cb, attrmask, eod_met)
1669 		       );
1670 	
1671 		if (FSAL_IS_ERROR(readdir_status))
1672 			return readdir_status;
1673 	
1674 		return status;
1675 	}
1676 	
1677 	/**
1678 	 * @brief Place a new dirent from create, lookup, or rename into a chunk if
1679 	 * possible, otherwise place as a detached dirent.
1680 	 *
1681 	 * If addition is not possible because the entry does not belong to an
1682 	 * active dirent chunk, nothing happens. The dirent may still be inserted
1683 	 * into the by name lookup as a detached dirent.
1684 	 *
1685 	 * @note If we can't insert the dirent into a chunk because we can't figure
1686 	 * out which chunk it belongs to, we can still trust the chunks, the new dirent
1687 	 * is not within their range, and if inserted between two non-adjacent chunks,
1688 	 * a subsequent readdir that enumerates that part of the directory will pick up
1689 	 * the new dirent since it will have to populate at least one new chunk in the
1690 	 * gap.
1691 	 *
1692 	 * @note parent_dir MUST have it's content_lock held for writing
1693 	 *
1694 	 * @param[in] parent_dir     The directory this dir entry is part of
1695 	 * @param[in] new_dir_entry  The dirent to add.
1696 	 *
1697 	 */
1698 	void place_new_dirent(mdcache_entry_t *parent_dir,
1699 			      mdcache_dir_entry_t *new_dir_entry)
1700 	{
1701 		mdcache_dir_entry_t *left;
1702 		mdcache_dir_entry_t *right;
1703 		struct avltree_node *node, *parent, *unbalanced, *other;
1704 		int is_left, code;
1705 		fsal_cookie_t ck, nck;
1706 		struct dir_chunk *chunk;
1707 		bool invalidate_chunks = true;
1708 	
1709 	#ifdef DEBUG_MDCACHE
1710 		assert(parent_dir->content_lock.__data.__cur_writer);
1711 	#endif
1712 		subcall(
1713 			ck = parent_dir->sub_handle->obj_ops->compute_readdir_cookie(
1714 					parent_dir->sub_handle, new_dir_entry->name)
1715 		       );
1716 	
1717 		if (ck == 0) {
1718 			/* FSAL does not support computing readdir cookie, so we can't
1719 			 * add this entry to a chunk, nor can we trust the chunks.
1720 			 */
1721 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1722 					"Could not add %s to chunk for directory %p, compute_readdir_cookie failed",
1723 					new_dir_entry->name, parent_dir);
1724 			goto out;
1725 		}
1726 	
1727 		new_dir_entry->ck = ck;
1728 	
1729 		node = avltree_do_lookup(&new_dir_entry->node_sorted,
1730 					 &parent_dir->fsobj.fsdir.avl.sorted,
1731 					 &parent, &unbalanced, &is_left,
1732 					 avl_dirent_sorted_cmpf);
1733 	
1734 		if (isFullDebug(COMPONENT_CACHE_INODE) ||
1735 		    isFullDebug(COMPONENT_NFS_READDIR)) {
1736 			if (node) {
1737 				right = avltree_container_of(node, mdcache_dir_entry_t,
1738 							     node_sorted);
1739 			}
1740 	
1741 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1742 					"avltree_do_lookup returned node=%p (name=%s, ck=%"
1743 					PRIx64") parent=%p unbalanced=%p is_left=%s",
1744 					node,
1745 					node ? right->name : "",
1746 					node ? right->ck : 0,
1747 					parent, unbalanced, is_left ? "true" : "false");
1748 		}
1749 	
1750 		if (node) {
1751 			right = avltree_container_of(node, mdcache_dir_entry_t,
1752 						     node_sorted);
1753 	
1754 			if (ck == FIRST_COOKIE && right->ck == FIRST_COOKIE) {
1755 				/* Special case of inserting a new first entry.
1756 				 * We should only have to do this for FSALs that
1757 				 * sort dirents by cookie value that support
1758 				 * compute_readdir_cookie and are unable to actually
1759 				 * compute the cookie for the very first directory
1760 				 * entry.
1761 				 */
1762 				subcall(
1763 					nck = parent_dir->sub_handle
1764 						->obj_ops->compute_readdir_cookie(
1765 							parent_dir->sub_handle,
1766 							right->name)
1767 				       );
1768 	
1769 				if (nck == 0) {
1770 					/* Oops, could not compute new cookie...
1771 					 * We can no longer trust the chunks.
1772 					 */
1773 					LogCrit(COMPONENT_CACHE_INODE,
1774 						"Could not compute new cookie for %s in directory %p",
1775 						right->name, parent_dir);
1776 					goto out;
1777 				}
1778 	
1779 				/* Just change up the old first entries cookie, which
1780 				 * will leave room to insert the new entry with cookie
1781 				 * of FIRST_COOKIE.
1782 				 */
1783 				right->ck = nck;
1784 			} else {
1785 				/* This should not happen... Let's no longer trust the
1786 				 * chunks.
1787 				 */
1788 				LogCrit(COMPONENT_CACHE_INODE,
1789 					"Could not add %s to chunk for directory %p, node %s found withck=%"
1790 					PRIx64,
1791 					new_dir_entry->name, parent_dir,
1792 					right->name, right->ck);
1793 				goto out;
1794 			}
1795 		}
1796 	
1797 		if (parent == NULL) {
1798 			/* The tree must be empty, there are no chunks to add this
1799 			 * entry to. There are no chunks to trust...
1800 			 */
1801 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1802 					"Could not add %s to chunk for directory %p, tree was empty",
1803 					new_dir_entry->name, parent_dir);
1804 			goto out;
1805 		}
1806 	
1807 		if (is_left) {
1808 			/* Parent will be to the right of the key. */
1809 			right = avltree_container_of(parent, mdcache_dir_entry_t,
1810 						     node_sorted);
1811 			other = avltree_prev(parent);
1812 			if (other) {
1813 				left = avltree_container_of(other, mdcache_dir_entry_t,
1814 							    node_sorted);
1815 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
1816 						COMPONENT_CACHE_INODE,
1817 						"%s is between %s and parent %s",
1818 						new_dir_entry->name,
1819 						left->name, right->name);
1820 			} else {
1821 				left = NULL;
1822 	
1823 				if (parent_dir->fsobj.fsdir.first_ck == right->ck) {
1824 					/* The right node is the first entry in the
1825 					 * directory. Add this key to the beginning of
1826 					 * the first chunk and fixup the chunk.
1827 					 */
1828 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
1829 							COMPONENT_CACHE_INODE,
1830 							"Adding %s as new first entry",
1831 							new_dir_entry->name);
1832 				} else {
1833 					/* The right entry is not the first entry in
1834 					 * the directory, so the key is a dirent
1835 					 * somewhere before the first chunked dirent.
1836 					 * we can't insert this key into a chunk,
1837 					 * however, we can still trust the chunks since
1838 					 * the new entry is part of the directory we
1839 					 * don't have cached, a readdir that wants that
1840 					 * part of the directory will populate a new
1841 					 * chunk.
1842 					 */
1843 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
1844 							COMPONENT_CACHE_INODE,
1845 							"Could not add %s to chunk for directory %p, somewhere before first chunk",
1846 							new_dir_entry->name,
1847 							parent_dir);
1848 	
1849 					invalidate_chunks = false;
1850 					goto out;
1851 				}
1852 			}
1853 		} else {
1854 			/* Parent will be to the left of the key. */
1855 			left = avltree_container_of(parent, mdcache_dir_entry_t,
1856 						    node_sorted);
1857 			other = avltree_next(parent);
1858 			if (other) {
1859 				right = avltree_container_of(other, mdcache_dir_entry_t,
1860 							     node_sorted);
1861 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
1862 						COMPONENT_CACHE_INODE,
1863 						"%s is between parent %s and %s",
1864 						new_dir_entry->name,
1865 						left->name, right->name);
1866 			} else {
1867 				right = NULL;
1868 	
1869 				if (left->eod) {
1870 					/* The right node is the last entry in the
1871 					 * directory. Add this key to the end of the
1872 					 * last chunk and fixup the chunk.
1873 					 */
1874 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
1875 							COMPONENT_CACHE_INODE,
1876 							"Adding %s as new last entry",
1877 							new_dir_entry->name);
1878 				} else {
1879 					/* The left entry is not the last entry in
1880 					 * the directory, so the key is a dirent
1881 					 * somewhere after the last chunked dirent.
1882 					 * we can't insert this key into a chunk,
1883 					 * however, we can still trust the chunks since
1884 					 * the new entry is part of the directory we
1885 					 * don't have cached, a readdir that wants that
1886 					 * part of the directory will populate a new
1887 					 * chunk.
1888 					 */
1889 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
1890 							COMPONENT_CACHE_INODE,
1891 							"Could not add %s to chunk for directory %p, somewhere after last chunk",
1892 							new_dir_entry->name,
1893 							parent_dir);
1894 	
1895 					invalidate_chunks = false;
1896 					goto out;
1897 				}
1898 			}
1899 		}
1900 	
1901 		/* Note in the following, every dirent that is in the sorted tree MUST
1902 		 * be in a chunk, so we don't check for chunk != NULL.
1903 		 */
1904 		/* Set up to add to chunk and by cookie AVL tree. */
1905 		if (right == NULL) {
1906 			/* Will go at end of left chunk. */
1907 			chunk = new_dir_entry->chunk = left->chunk;
1908 		} else {
1909 			/* Will go at begin of right chunk. */
1910 			chunk = new_dir_entry->chunk = right->chunk;
1911 		}
1912 	
1913 		code = mdcache_avl_insert_ck(parent_dir, new_dir_entry);
1914 	
1915 		if (code < 0) {
1916 			/* We failed to insert into FSAL cookie AVL tree, will fail.
1917 			 * Nothing to clean up since we haven't done anything
1918 			 * unreversible, and we no longer trust the chunks.
1919 			 */
1920 			goto out;
1921 		}
1922 	
1923 		/* Get the node into the actual tree... */
1924 		avltree_do_insert(&new_dir_entry->node_sorted,
1925 				  &parent_dir->fsobj.fsdir.avl.sorted,
1926 				  parent, unbalanced, is_left);
1927 	
1928 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1929 				"Inserted %s into sorted tree left=%p right=%p",
1930 				new_dir_entry->name, new_dir_entry->node_sorted.left,
1931 				new_dir_entry->node_sorted.right);
1932 	
1933 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1934 				"Adding %s to chunk %p between %s and %s for directory %p",
1935 				new_dir_entry->name,
1936 				right ? right->chunk : left->chunk,
1937 				left ? left->name : "BEGIN",
1938 				right ? right->name : "END",
1939 				parent_dir);
1940 	
1941 		/* And now add it to the chunk */
1942 		if (right == NULL) {
1943 			/* Insert node at END of the chunk represented by left. */
1944 			glist_add_tail(&left->chunk->dirents,
1945 				       &new_dir_entry->chunk_list);
1946 	
1947 			/* Make the new entry the eod entry. */
1948 			new_dir_entry->eod = true;
1949 			left->eod = false;
1950 		} else {
1951 			/* Insert to left of right, which if left and right are
1952 			 * different chunks, inserts into the right hand chunk.
1953 			 *
1954 			 * NOTE: This looks weird, normally we pass the list head to
1955 			 *       glist_add_tail, but glist_add_tail really just
1956 			 *       inserts the entry before the first parameter, recall
1957 			 *       that the list head is just a member of the list...
1958 			 *
1959 			 * If left == NULL, then the "list node" to the left of
1960 			 * right is the actual list head, and this all works out...
1961 			 */
1962 			glist_add_tail(&right->chunk_list, &new_dir_entry->chunk_list);
1963 	
1964 			if (left != NULL) {
1965 				/* Fixup left chunk's next cookie */
1966 				left->chunk->next_ck = new_dir_entry->ck;
1967 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
1968 						COMPONENT_CACHE_INODE,
1969 						"Fixup next_ck=%"PRIx64,
1970 						left->chunk->next_ck);
1971 			} else {
1972 				/* New first entry in directory */
1973 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
1974 						COMPONENT_CACHE_INODE,
1975 						"Setting directory first_ck=%"PRIx64,
1976 						new_dir_entry->ck);
1977 				parent_dir->fsobj.fsdir.first_ck = new_dir_entry->ck;
1978 			}
1979 		}
1980 	
1981 		/* And now increment the number of entries in the chunk. */
1982 		chunk->num_entries++;
1983 	
1984 		/* And bump the chunk in the LRU */
1985 		lru_bump_chunk(chunk);
1986 	
1987 		if (chunk->num_entries == mdcache_param.dir.avl_chunk_split) {
1988 			/* Create a new chunk */
1989 			struct dir_chunk *split;
1990 			struct glist_head *glist;
1991 			mdcache_dir_entry_t *here = NULL;
1992 			int i = 0;
1993 			uint32_t split_count = mdcache_param.dir.avl_chunk_split / 2;
1994 	
1995 			split = mdcache_get_chunk(parent_dir, chunk, 0);
1996 			split->next_ck = chunk->next_ck;
1997 	
1998 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
1999 					"Split next_ck=%"PRIx64,
2000 					split->next_ck);
2001 	
2002 			/* Make sure this chunk is in the MRU of L1 */
2003 			lru_bump_chunk(split);
2004 	
2005 			/* Scan the list to find what will be the first dirent in the
2006 			 * new split chunk.
2007 			 */
2008 			glist_for_each(glist, &chunk->dirents) {
2009 				if (++i > (split_count)) {
2010 					/* Got past the halfway point. */
2011 					here = glist_entry(glist,
2012 							   mdcache_dir_entry_t,
2013 							   chunk_list);
2014 					break;
2015 				}
2016 			}
2017 	
2018 			assert(here != NULL);
2019 	
2020 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2021 					"Splitting chunk %p for directory %p at %s",
2022 					chunk, parent_dir, here->name);
2023 	
2024 			/* Split chunk->dirents into split->dirents at here */
2025 			glist_split(&chunk->dirents, &split->dirents, glist);
2026 			chunk->num_entries = split_count;
2027 			split->num_entries = split_count;
2028 			chunk->reload_ck = glist_last_entry(&chunk->dirents,
2029 							    mdcache_dir_entry_t,
2030 							    chunk_list)->ck;
2031 	
2032 			/* Update the chunk pointer on all the dirents */
2033 			glist_for_each(glist, &split->dirents) {
2034 				mdcache_dir_entry_t *dirent;
2035 	
2036 				dirent = glist_entry(glist, mdcache_dir_entry_t,
2037 						     chunk_list);
2038 				dirent->chunk = split;
2039 			}
2040 	
2041 			/* Fill in the first chunk's next_ck to be the cookie of the
2042 			 * first dirent in the new split chunk.
2043 			 */
2044 			chunk->next_ck = here->ck;
2045 	
2046 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2047 					"Chunk next_ck=%"PRIx64,
2048 					chunk->next_ck);
2049 		}
2050 	
2051 		new_dir_entry->flags |= DIR_ENTRY_SORTED;
2052 		invalidate_chunks = false;
2053 	
2054 	out:
2055 	
2056 		if (invalidate_chunks) {
2057 			/* Indicate we not longer trust the chunk cache. */
2058 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2059 					"Entry %p clearing MDCACHE_DIR_POPULATED, MDCACHE_TRUST_DIR_CHUNKS",
2060 					parent_dir);
2061 			atomic_clear_uint32_t_bits(&parent_dir->mde_flags,
2062 						   MDCACHE_DIR_POPULATED |
2063 						   MDCACHE_TRUST_DIR_CHUNKS);
2064 		}
2065 	
2066 		if (new_dir_entry->chunk == NULL) {
2067 			/* This is a detached directory entry, add it to the LRU list of
2068 			 * detached directory entries. This is the one and only place a
2069 			 * detached dirent can be added.
2070 			 */
2071 			add_detached_dirent(parent_dir, new_dir_entry);
2072 		}
2073 	}
2074 	
2075 	/**
2076 	 * @brief Handle adding an element to a dirent chunk
2077 	 *
2078 	 * Cache a sindle object, and add it to the directory chunk in progress.
2079 	 *
2080 	 * @param[in]     name       Name of the directory entry
2081 	 * @param[in]     sub_handle Object for entry
2082 	 * @param[in]     attrs      Attributes requested for the object
2083 	 * @param[in,out] dir_state  Callback state
2084 	 * @param[in]     cookie     Directory cookie
2085 	 *
2086 	 * @returns fsal_dir_result
2087 	 */
2088 	
2089 	static enum fsal_dir_result
2090 	mdc_readdir_chunk_object(const char *name, struct fsal_obj_handle *sub_handle,
2091 				 struct attrlist *attrs_in, void *dir_state,
2092 				 fsal_cookie_t cookie)
2093 	{
2094 		struct mdcache_populate_cb_state *state = dir_state;
2095 		struct dir_chunk *chunk = state->cur_chunk;
2096 		mdcache_entry_t *mdc_parent = container_of(&state->dir->obj_handle,
2097 							   mdcache_entry_t, obj_handle);
2098 		struct mdcache_fsal_export *export = mdc_cur_export();
2099 		mdcache_entry_t *new_entry = NULL;
2100 		mdcache_dir_entry_t *new_dir_entry = NULL, *allocated_dir_entry = NULL;
2101 		size_t namesize = strlen(name) + 1;
2102 		int code = 0;
2103 		fsal_status_t status;
2104 		enum fsal_dir_result result = DIR_CONTINUE;
2105 	
2106 	#ifdef DEBUG_MDCACHE
2107 		assert(mdc_parent->content_lock.__data.__cur_writer);
2108 	#endif
2109 	
2110 		if (chunk->num_entries == mdcache_param.dir.avl_chunk) {
2111 			/* We are being called readahead. */
2112 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2113 					"Readdir readahead first entry in new chunk %s",
2114 					name);
2115 	
2116 			state->prev_chunk = chunk;
2117 			state->prev_chunk->next_ck = cookie;
2118 	
2119 			/* Chunk is added to the chunks list before being passed in */
2120 			/* Now start a new chunk, passing this chunk as prev_chunk. */
2121 			chunk = mdcache_get_chunk(chunk->parent, chunk, 0);
2122 	
2123 			state->cur_chunk = chunk;
2124 	
2125 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2126 					"Chunk %p Prev chunk %p next_ck=%" PRIx64,
2127 					chunk, state->prev_chunk,
2128 					state->prev_chunk->next_ck);
2129 			/* And start accepting entries into the new chunk. */
2130 		}
2131 	
2132 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2133 				"Creating cache entry for %s cookie=0x%"PRIx64
2134 				" sub_handle=0x%p",
2135 				name, cookie, sub_handle);
2136 	
2137 		status = mdcache_new_entry(export, sub_handle, attrs_in, NULL,
2138 					   false, &new_entry, NULL, MDC_REASON_SCAN);
2139 	
2140 		if (FSAL_IS_ERROR(status)) {
2141 			*state->status = status;
2142 			LogInfoAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2143 				   "mdcache_new_entry failed on %s in dir %p with %s",
2144 				   name, mdc_parent, fsal_err_txt(status));
2145 			return DIR_TERMINATE;
2146 		}
2147 	
2148 		/* Entry was found in the FSAL, add this entry to the parent directory
2149 		 */
2150 	
2151 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2152 				"Add mdcache entry %p for %s for FSAL %s",
2153 				new_entry, name, new_entry->sub_handle->fsal->name);
2154 	
2155 		/* in cache avl, we always insert on mdc_parent */
2156 		new_dir_entry = gsh_calloc(1, sizeof(mdcache_dir_entry_t) + namesize);
2157 		new_dir_entry->flags = DIR_ENTRY_FLAG_NONE;
2158 		new_dir_entry->chunk = chunk;
2159 		new_dir_entry->ck = cookie;
2160 		allocated_dir_entry = new_dir_entry;
2161 	
2162 		/** @todo FSF - we could eventually try and support duplicated FSAL
2163 		 *              cookies assuming they come sequentially (which they
2164 		 *              would from EXT4 as far as I can tell from the EXT4
2165 		 *              code). We could never start a chunk with a duplicate
2166 		 *              so we would have to put all of them into the same
2167 		 *              chunk, posssibly making the chunk larger than normal.
2168 		 */
2169 	
2170 		memcpy(&new_dir_entry->name_buffer, name, namesize);
2171 		new_dir_entry->name = new_dir_entry->name_buffer;
2172 		mdcache_key_dup(&new_dir_entry->ckey, &new_entry->fh_hk.key);
2173 	
2174 		/* add to avl */
2175 		code = mdcache_avl_insert(mdc_parent, &new_dir_entry);
2176 	
2177 		if (code < 0) {
2178 			/* We can get here with the following possibilities:
2179 			 *
2180 			 * - FSAL cookie collision, nothing we can do about this, but
2181 			 *   also really should never happen.
2182 			 * - Name collision, something is broken and the FSAL has
2183 			 *   given us multiple directory entries with the same name
2184 			 *   but for different objects. Again, not much we can do.
2185 			 *
2186 			 * In any case, we will just ignore this entry.
2187 			 */
2188 			mdcache_put(new_entry);
2189 			/* Check for return code -3 and/or -4.
2190 			 * -3: This indicates the file name is duplicate but FSAL
2191 			 * cookie is different. This may happen in case lots of new
2192 			 * entries got added to the directory while running readdir.
2193 			 * -4: This indicates that it is FSAL cookie duplication /
2194 			 * collision. This could happen due to fast mutating directory.
2195 			 * In both cases already cached contents are stale/invalid.
2196 			 * Need to invalidate the cache and inform client to re-read
2197 			 * the directory.
2198 			 */
2199 			if (code == -3 || code == -4) {
2200 				atomic_clear_uint32_t_bits(&state->dir->mde_flags,
2201 							   MDCACHE_TRUST_CONTENT);
2202 				state->status->major = ERR_FSAL_DELAY;
2203 				state->status->minor = 0;
2204 				return DIR_TERMINATE;
2205 			}
2206 			LogCrit(COMPONENT_CACHE_INODE,
2207 				"Collision while adding dirent for %s", name);
2208 			return DIR_CONTINUE;
2209 		}
2210 	
2211 		/* Note that if this dirent was already in the lookup by name AVL
2212 		 * tree (mdc_parent->fsobj.fsdir.avl.t), then mdcache_avl_qp_insert
2213 		 * freed the dirent we allocated above, and returned the one that was
2214 		 * in tree. It will have set chunk, ck, and nk.
2215 		 *
2216 		 * The existing dirent might or might not be part of a chunk already.
2217 		 */
2218 	
2219 		if (new_dir_entry != allocated_dir_entry) {
2220 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2221 					"Swapped %s using %p instead of %p, new_dir_entry->chunk=%p chunk=%p",
2222 					new_dir_entry->name, new_dir_entry,
2223 					allocated_dir_entry, new_dir_entry->chunk,
2224 					chunk);
2225 		}
2226 	
2227 		assert(new_dir_entry->chunk);
2228 	
2229 		if (state->whence_search && new_dir_entry->ck == state->cookie) {
2230 			/* We have found the dirent the caller is looking for. */
2231 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2232 					"Found dirent %s caller is looking for cookie = %"
2233 					PRIx64, name, state->cookie);
2234 			*(state->dirent) = new_dir_entry;
2235 		}
2236 	
2237 		if (op_ctx->fsal_export->exp_ops.fs_supports(
2238 				op_ctx->fsal_export, fso_compute_readdir_cookie)) {
2239 			struct avltree_node *node;
2240 	
2241 			node = avltree_inline_insert(
2242 						&new_dir_entry->node_sorted,
2243 						&mdc_parent->fsobj.fsdir.avl.sorted,
2244 						avl_dirent_sorted_cmpf);
2245 	
2246 			if (node != NULL) {
2247 				if (node == &new_dir_entry->node_sorted) {
2248 					LogDebugAlt(COMPONENT_NFS_READDIR,
2249 						    COMPONENT_CACHE_INODE,
2250 						    "New entry %s was already in sorted tree",
2251 						    name);
2252 				} else if (isDebug(COMPONENT_CACHE_INODE) ||
2253 					   isDebug(COMPONENT_NFS_READDIR)) {
2254 					mdcache_dir_entry_t *other;
2255 	
2256 					other = avltree_container_of(
2257 						node, mdcache_dir_entry_t, node_sorted);
2258 					LogDebugAlt(COMPONENT_NFS_READDIR,
2259 						    COMPONENT_CACHE_INODE,
2260 						    "New entry %s collided with entry %s already in sorted tree",
2261 						    name, other->name);
2262 				}
2263 			} else {
2264 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2265 						COMPONENT_CACHE_INODE,
2266 						"Inserted %s into sorted tree left=%p right=%p",
2267 						name, new_dir_entry->node_sorted.left,
2268 						new_dir_entry->node_sorted.right);
2269 	
2270 				new_dir_entry->flags |= DIR_ENTRY_SORTED;
2271 			}
2272 		}
2273 	
2274 		/* Add this dirent to the chunk if not already added. */
2275 		if (glist_null(&new_dir_entry->chunk_list)) {
2276 			/* If this dirent is not already on a chunk_list, then we add
2277 			 * it. It could be the allocated_dir_entry or it could be an
2278 			 * old dirent that was not part of a chunk, but it is NOT the
2279 			 * same dirent that was already part of some other chunk.
2280 			 */
2281 			glist_add_tail(&chunk->dirents, &new_dir_entry->chunk_list);
2282 			chunk->num_entries++;
2283 		}
2284 	
2285 		if (new_dir_entry->chunk != chunk) {
2286 			/* We have the situation where we have collided with a
2287 			 * previously used chunk (and thus we have a partial chunk).
2288 			 * Since dirent is pointing to the existing dirent and the one
2289 			 * we allocated above has been freed we don't need to do any
2290 			 * cleanup.
2291 			 *
2292 			 * Don't allow readahead in this case just indicate this
2293 			 * directory is terminated.
2294 			 */
2295 			result = DIR_TERMINATE;
2296 	
2297 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2298 					"Collision old chunk %p next_ck=%"PRIx64
2299 					" new chunk %p next_ck=%"PRIx64,
2300 					new_dir_entry->chunk,
2301 					new_dir_entry->chunk->next_ck, chunk,
2302 					chunk->next_ck);
2303 			if (chunk->num_entries == 0) {
2304 	
2305 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2306 					    COMPONENT_CACHE_INODE,
2307 					    "Nuking empty Chunk %p", chunk);
2308 				/* We read-ahead into an existing chunk, and this chunk
2309 				 * is empty.  Just ditch it now, to avoid any issue. */
2310 				mdcache_lru_unref_chunk(chunk);
2311 				if (state->first_chunk == chunk) {
2312 					/* Drop the first_chunk ref */
2313 					mdcache_lru_unref_chunk(state->first_chunk);
2314 					state->first_chunk = new_dir_entry->chunk;
2315 					/* And take the first_chunk ref */
2316 					mdcache_lru_ref_chunk(state->first_chunk);
2317 				}
2318 				chunk = new_dir_entry->chunk;
2319 				state->cur_chunk = chunk;
2320 				if (new_dir_entry->entry) {
2321 					/* This was ref'd already; drop extra ref */
2322 					mdcache_put(new_dir_entry->entry);
2323 					new_dir_entry->entry = NULL;
2324 				}
2325 				if (state->prev_chunk) {
2326 					state->prev_chunk->next_ck = new_dir_entry->ck;
2327 				}
2328 			} else {
2329 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2330 						COMPONENT_CACHE_INODE,
2331 						"keeping non-empty Chunk %p", chunk);
2332 				chunk->next_ck = new_dir_entry->ck;
2333 			}
2334 		} else if (chunk->num_entries == mdcache_param.dir.avl_chunk) {
2335 			/* Chunk is full. Since dirent is pointing to the existing
2336 			 * dirent and the one we allocated above has been freed we don't
2337 			 * need to do any cleanup.
2338 			 *
2339 			 * Allow readahead.
2340 			 *
2341 			 * If there's actually any readahead, chunk->next_ck will get
2342 			 * filled in.
2343 			 */
2344 			result = DIR_READAHEAD;
2345 		}
2346 	
2347 		if (new_entry->obj_handle.type == DIRECTORY) {
2348 			/* Insert Parent's key */
2349 			PTHREAD_RWLOCK_wrlock(&new_entry->content_lock);
2350 			mdc_dir_add_parent(new_entry, mdc_parent);
2351 			PTHREAD_RWLOCK_unlock(&new_entry->content_lock);
2352 		}
2353 	
2354 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2355 				"About to put entry %p refcnt=%"PRIi32,
2356 				new_entry,
2357 				atomic_fetch_int32_t(&new_entry->lru.refcnt));
2358 	
2359 		/* Note that this entry is ref'd, so that mdcache_readdir_chunked can
2360 		 * un-ref it.  Pass this ref off to the dir_entry for this purpose. */
2361 		assert(!new_dir_entry->entry);
2362 		new_dir_entry->entry = new_entry;
2363 	
2364 		return result;
2365 	}
2366 	
2367 	/**
2368 	 * @brief Handle a readdir callback for a chunked directory.
2369 	 *
2370 	 * This is a supercall wrapper around the function above that actually does
2371 	 * the work.
2372 	 *
2373 	 * @param[in]     name       Name of the directory entry
2374 	 * @param[in]     sub_handle Object for entry
2375 	 * @param[in]     attrs      Attributes requested for the object
2376 	 * @param[in,out] dir_state  Callback state
2377 	 * @param[in]     cookie     Directory cookie
2378 	 *
2379 	 * @returns fsal_dir_result
2380 	 */
2381 	
2382 	static enum fsal_dir_result
2383 	mdc_readdir_chunked_cb(const char *name, struct fsal_obj_handle *sub_handle,
2384 			       struct attrlist *attrs, void *dir_state,
2385 			       fsal_cookie_t cookie)
2386 	{
2387 		struct mdcache_populate_cb_state *state = dir_state;
2388 		enum fsal_dir_result result;
2389 	
2390 		/* This is in the middle of a subcall. Do a supercall */
2391 		supercall_raw(state->export,
2392 			result = mdc_readdir_chunk_object(name, sub_handle, attrs,
2393 							  dir_state, cookie)
2394 		);
2395 	
2396 		return result;
2397 	}
2398 	
2399 	/**
2400 	 * @brief Skip directory chunks while re-filling dirent cache in search of
2401 	 *        a specific cookie that is not in cache.
2402 	 *
2403 	 * @note The content lock MUST be held for write
2404 	 *
2405 	 * @param[in] directory  The directory being read
2406 	 * @param[in] next_ck    The next cookie to find the next chunk
2407 	 *
2408 	 * @returns The chunk found, or NULL.
2409 	 */
2410 	static struct dir_chunk *mdcache_skip_chunks(mdcache_entry_t *directory,
2411 						     fsal_cookie_t next_ck)
2412 	{
2413 		mdcache_dir_entry_t *dirent = NULL;
2414 		struct dir_chunk *chunk = NULL;
2415 	
2416 		/* We need to skip chunks that are already cached. */
2417 		while (next_ck != 0 &&
2418 		       mdcache_avl_lookup_ck(directory, next_ck, &dirent)) {
2419 			chunk = dirent->chunk;
2420 			mdcache_lru_unref_chunk(chunk);
2421 			next_ck = chunk->next_ck;
2422 		}
2423 	
2424 		/* At this point, we have the last cached chunk before a gap. */
2425 		return chunk;
2426 	}
2427 	
2428 	/**
2429 	 * @brief Read the next chunk of a directory
2430 	 *
2431 	 * If called for an FSAL that only supports whence as the dirent name to
2432 	 * continue from, and prev_chunk is NULL, we must scan the directory from the
2433 	 * beginning. If prev_chunk is not NULL, we can scan the directory starting with
2434 	 * the last dirent name in prev_chunk, but we must still scan the directory
2435 	 * until we find whence.
2436 	 *
2437 	 * @note this returns a ref on the chunk containing @a dirent
2438 	 *
2439 	 * @param[in] directory   The directory to read
2440 	 * @param[in] whence      Where to start (next)
2441 	 * @param[in,out] dirent  The first dirent of the chunk
2442 	 * @param[in] prev_chunk  The previous chunk populated
2443 	 * @param[in,out] eod_met The end of directory has been hit.
2444 	 *
2445 	 * @return FSAL status
2446 	 */
2447 	
2448 	fsal_status_t mdcache_populate_dir_chunk(mdcache_entry_t *directory,
2449 						 fsal_cookie_t whence,
2450 						 mdcache_dir_entry_t **dirent,
2451 						 struct dir_chunk *prev_chunk,
2452 						 bool *eod_met)
2453 	{
2454 		fsal_status_t status = {0, 0};
2455 		fsal_status_t readdir_status = {0, 0};
2456 		struct mdcache_populate_cb_state state;
2457 		struct dir_chunk *chunk;
2458 		attrmask_t attrmask;
2459 		fsal_cookie_t *whence_ptr = &whence;
2460 	
2461 		chunk = mdcache_get_chunk(directory, prev_chunk, whence);
2462 	
2463 		attrmask = op_ctx->fsal_export->exp_ops.fs_supported_attrs(
2464 						op_ctx->fsal_export) | ATTR_RDATTR_ERR;
2465 	
2466 		/* Take a ref on the first chunk */
2467 		mdcache_lru_ref_chunk(chunk);
2468 	
2469 		state.export = mdc_cur_export();
2470 		state.dir = directory;
2471 		state.status = &status;
2472 		state.cb = NULL;  /* We don't use the call back during chunking. */
2473 		state.first_chunk = state.cur_chunk = chunk;
2474 		state.prev_chunk = prev_chunk;
2475 		state.cookie = whence;
2476 		state.dirent = dirent;
2477 		state.whence_is_name = op_ctx->fsal_export->exp_ops.fs_supports(
2478 					op_ctx->fsal_export, fso_whence_is_name);
2479 		state.whence_search = state.whence_is_name && whence != 0 &&
2480 								prev_chunk == NULL;
2481 	
2482 		if (state.whence_is_name) {
2483 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2484 					"whence_is_name %s cookie %"
2485 					PRIx64,
2486 					state.whence_search ? "search" : "no search",
2487 					state.cookie);
2488 		}
2489 	
2490 	
2491 	again:
2492 	
2493 		/* In whence_is_name case, we may need to do another FSAL readdir
2494 		 * call to continue scanning for the desired cookie, so we will jump
2495 		 * back to here to accomplish that. chunk is newly allocated and
2496 		 * prev_chunk has been updated to point to the last cached chunk.
2497 		 */
2498 		if (state.whence_is_name) {
2499 			if (prev_chunk != NULL) {
2500 				/* Start from end of prev_chunk */
2501 				/* If end of directory, mark last dirent as eod. */
2502 				mdcache_dir_entry_t *last;
2503 	
2504 				last = glist_last_entry(&prev_chunk->dirents,
2505 							mdcache_dir_entry_t,
2506 							chunk_list);
2507 				whence_ptr = (fsal_cookie_t *)last->name;
2508 	
2509 				if (state.whence_search) {
2510 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
2511 							COMPONENT_CACHE_INODE,
2512 							"Calling FSAL readdir whence = %s, search %"
2513 							PRIx64,
2514 							last->name, state.cookie);
2515 				} else {
2516 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
2517 							COMPONENT_CACHE_INODE,
2518 							"Calling FSAL readdir whence = %s, no search",
2519 							last->name);
2520 				}
2521 			} else {
2522 				/* Signal start from beginning by passing NULL pointer.
2523 				 */
2524 				whence_ptr = NULL;
2525 				if (state.whence_search) {
2526 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
2527 							COMPONENT_CACHE_INODE,
2528 							"Calling FSAL readdir whence = NULL, search %"
2529 							PRIx64, state.cookie);
2530 				} else {
2531 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
2532 							COMPONENT_CACHE_INODE,
2533 							"Calling FSAL readdir whence = NULL, no search");
2534 				}
2535 			}
2536 		} else {
2537 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2538 					"Calling FSAL readdir whence = 0x%"PRIx64,
2539 					whence);
2540 		}
2541 	
2542 	#ifdef USE_LTTNG
2543 		tracepoint(mdcache, mdc_readdir_populate,
2544 			   __func__, __LINE__, &directory->obj_handle,
2545 			   directory->sub_handle, whence);
2546 	#endif
2547 		subcall(
2548 			readdir_status = directory->sub_handle->obj_ops->readdir(
2549 				directory->sub_handle, whence_ptr, &state,
2550 				mdc_readdir_chunked_cb, attrmask, eod_met)
2551 		       );
2552 	
2553 		if (FSAL_IS_ERROR(readdir_status)) {
2554 			LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2555 				    "FSAL readdir status=%s",
2556 				    fsal_err_txt(readdir_status));
2557 			*dirent = NULL;
2558 			mdcache_lru_unref_chunk(chunk);
2559 			return readdir_status;
2560 		}
2561 	
2562 		if (FSAL_IS_ERROR(status)) {
2563 			LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2564 				    "status=%s",
2565 				    fsal_err_txt(status));
2566 			*dirent = NULL;
2567 			mdcache_lru_unref_chunk(chunk);
2568 			return status;
2569 		}
2570 	
2571 		/* Recover the most recent chunk from cur_chunk, if we had readahead.
2572 		 * it might have changed.
2573 		 */
2574 		chunk = state.cur_chunk;
2575 	
2576 		if (chunk->num_entries == 0) {
2577 			/* Chunk is empty - should only happen for an empty directory
2578 			 * but could happen if the FSAL failed to indicate end of
2579 			 * directory. This COULD happen on a readahead chunk, but it
2580 			 * would be unusual.
2581 			 */
2582 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2583 					"Empty chunk");
2584 	
2585 			mdcache_lru_unref_chunk(chunk);
2586 	
2587 			if (chunk == state.first_chunk) {
2588 				/* We really got nothing on this readdir, so don't
2589 				 * return a dirent.
2590 				 */
2591 				*dirent = NULL;
2592 				mdcache_lru_unref_chunk(chunk);
2593 				LogDebugAlt(COMPONENT_NFS_READDIR,
2594 					    COMPONENT_CACHE_INODE,
2595 					    "status=%s",
2596 					    fsal_err_txt(status));
2597 				return status;
2598 			}
2599 	
2600 			/* If the empty chunk wasn't first, then prev_chunk is valid */
2601 			chunk = state.prev_chunk;
2602 		}
2603 	
2604 		if (*eod_met) {
2605 			/* If end of directory, mark last dirent as eod. */
2606 			mdcache_dir_entry_t *last;
2607 	
2608 			last = glist_last_entry(&chunk->dirents, mdcache_dir_entry_t,
2609 						chunk_list);
2610 			last->eod = true;
2611 		}
2612 	
2613 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2614 				"Chunk first entry %s%s",
2615 				*dirent != NULL ? (*dirent)->name : "<NONE>",
2616 				*eod_met ? " EOD" : "");
2617 	
2618 		if (state.whence_search && *dirent == NULL) {
2619 			if (*eod_met) {
2620 				/* Did not find cookie. */
2621 				status = fsalstat(ERR_FSAL_BADCOOKIE, 0);
2622 				LogDebugAlt(COMPONENT_NFS_READDIR,
2623 					    COMPONENT_CACHE_INODE,
2624 					    "Could not find search cookie status=%s",
2625 					    fsal_err_txt(status));
2626 				return status;
2627 			}
2628 	
2629 			/* We are re-scanning directory, and we have not found our
2630 			 * cookie yet, we either used up the FSAL's readdir (with any
2631 			 * readahead) or we collided with an already cached chunk,
2632 			 * which we know DOES NOT have our cookie (because otherwise we
2633 			 * would have found it on lookup), so we will start from where
2634 			 * we left off.
2635 			 *
2636 			 * chunk points to the last valid chunk of what we just read,
2637 			 * but we also have to check if we must skip chunks that had
2638 			 * already been in cache.
2639 			 *
2640 			 * If chunk->next_ck is 0, then we didn't collide, so there are
2641 			 * no chunks to skip.
2642 			 */
2643 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2644 					"Rescan dir to find cookie needs to continue search for %"
2645 					PRIx64, state.cookie);
2646 	
2647 			if (chunk->next_ck != 0) {
2648 				/* In the collision case, chunk->next_ck was set,
2649 				 * so now start skipping.
2650 				 */
2651 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2652 						COMPONENT_CACHE_INODE,
2653 						"Search skipping from cookie %"PRIx64,
2654 						chunk->next_ck);
2655 				chunk = mdcache_skip_chunks(directory, chunk->next_ck);
2656 			}
2657 	
2658 			/* We need to start a new FSAL readdir call, but we don't just
2659 			 * want to call mdcache_populate_dir_chunk raw, so set up a few
2660 			 * things and jump to again...
2661 			 */
2662 			/* The chunk we just dealt with is now prev_chunk. */
2663 			prev_chunk = chunk;
2664 	
2665 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2666 					"About to allocate a new chunk to continue search, prev chunk = %p",
2667 					prev_chunk);
2668 	
2669 			/* And we need to allocate a fresh chunk. */
2670 			chunk = mdcache_get_chunk(directory, chunk, 0);
2671 	
2672 			/* And switch over to new chunk. */
2673 			state.cur_chunk = chunk;
2674 			state.prev_chunk = prev_chunk;
2675 	
2676 			/* And go start a new FSAL readdir call.  */
2677 			goto again;
2678 		}
2679 	
2680 		if (*dirent == NULL) {
2681 			/* We haven't set dirent yet, return the first entry of the
2682 			 * first chunk.
2683 			 */
2684 			*dirent = glist_first_entry(&state.first_chunk->dirents,
2685 						    mdcache_dir_entry_t,
2686 						    chunk_list);
2687 		}
2688 	
2689 		LogDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2690 			    "status=%s",
2691 			    fsal_err_txt(status));
2692 	
2693 		return status;
2694 	}
2695 	
2696 	/**
2697 	 * @brief Read the contents of a directory
2698 	 *
2699 	 * If necessary, populate dirent cache chunks from the underlying FSAL. Then,
2700 	 * walk the dirent cache chunks calling the callback.
2701 	 *
2702 	 * Interactions between readdir and entry LRU lifetime is complicated.  We want
2703 	 * the LRU to be scan resistant, so that readdir() doesn't empty useful entries
2704 	 * from the LRU.  However, the readdir() has to work in such a way that it's
2705 	 * entries are still in the cache when they're used.  To achieve this, we do two
2706 	 * things:
2707 	 *
2708 	 * First, we insert objects created during a scan into the MRU of L2, rather
2709 	 * than the LRU of L1.  This allows them to be recycled in FIFO order rather
2710 	 * than LIFO order.  Observed behavior was that, when we are over the hi-water
2711 	 * mark, readdir() of a large directory would empty the L2 by recycling entries.
2712 	 * Then, it would start recycling the LRU of L1.  However, the LRU of L1
2713 	 * contained entries created during the readdir().  This means that, after the
2714 	 * chunk loaded, and it's entries need to be returned to the upper layer, they
2715 	 * have been recycled, and need to be re-created via a lookup() and getattr()
2716 	 * pair, causing large numbers of round-trips to the cluster.  Inserting into
2717 	 * the MRU of L2 keeps the L2 from being emptied, and causes the entries to be
2718 	 * recycled FIFO, making it likely that the entries for a chunk are still in the
2719 	 * cache when needed.
2720 	 *
2721 	 * The second important thing to do is to *not* take an INITIAL ref on entries
2722 	 * when they are used during the scan.  An INITIAL ref promotes the entry in the
2723 	 * LRU, which would put it at LRU of L1, recreating the above situation.  To
2724 	 * avoid this, and keep scan resistance, we take a non-initial ref during
2725 	 * readdir().
2726 	 *
2727 	 * @note The object passed into the callback is ref'd and must be unref'd by the
2728 	 * callback.
2729 	 *
2730 	 * @param[in] directory  The directory to read
2731 	 * @param[in] whence     Where to start (next)
2732 	 * @param[in] dir_state  Pass thru of state to callback
2733 	 * @param[in] cb         Callback function
2734 	 * @param[in] attrmask   Which attributes to fill
2735 	 * @param[out] eod_met   eod marker true == end of dir
2736 	 *
2737 	 * @return FSAL status
2738 	 */
2739 	
2740 	fsal_status_t mdcache_readdir_chunked(mdcache_entry_t *directory,
2741 					      fsal_cookie_t whence,
2742 					      void *dir_state,
2743 					      fsal_readdir_cb cb,
2744 					      attrmask_t attrmask,
2745 					      bool *eod_met)
2746 	{
2747 		mdcache_dir_entry_t *dirent = NULL;
2748 		bool has_write, set_first_ck;
2749 		fsal_cookie_t next_ck = whence, look_ck = whence;
2750 		struct dir_chunk *chunk = NULL;
2751 		bool first_pass = true;
2752 		bool eod = false;
2753 		bool reload_chunk = false;
2754 	
2755 	#ifdef USE_LTTNG
(1) Event cond_true: Condition "!!({...; *((int volatile *)&__tracepoint_mdcache___mdc_readdir.state);})", taking true branch.
(2) Event cond_true: Condition "!!({...; *((int volatile *)&__tracepoint_mdcache___mdc_readdir.state);})", taking true branch.
2756 		tracepoint(mdcache, mdc_readdir,
2757 			   __func__, __LINE__, &directory->obj_handle);
2758 	#endif
(3) Event cond_true: Condition "!!(component_log_level[COMPONENT_NFS_READDIR] >= NIV_FULL_DEBUG)", taking true branch.
(4) Event cond_true: Condition "!!(component_log_level[COMPONENT_NFS_READDIR] >= NIV_FULL_DEBUG)", taking true branch.
(5) Event cond_true: Condition "test_mde_flags(directory, 4U /* FSAL_UP_INVALIDATE_CONTENT */)", taking true branch.
(6) Event cond_true: Condition "test_mde_flags(directory, 16U /* FSAL_UP_INVALIDATE_DIR_CHUNKS */)", taking true branch.
2759 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2760 				"Starting chunked READDIR for %p, MDCACHE_TRUST_CONTENT %s, MDCACHE_TRUST_DIR_CHUNKS %s",
2761 				directory,
2762 				test_mde_flags(directory, MDCACHE_TRUST_CONTENT)
2763 					? "true" : "false",
2764 				test_mde_flags(directory, MDCACHE_TRUST_DIR_CHUNKS)
2765 					? "true" : "false");
2766 	
2767 		/* Dirent's are being chunked; check to see if it needs updating */
(7) Event cond_false: Condition "!test_mde_flags(directory, 20U /* FSAL_UP_INVALIDATE_CONTENT | FSAL_UP_INVALIDATE_DIR_CHUNKS */)", taking false branch.
2768 		if (!test_mde_flags(directory, MDCACHE_TRUST_CONTENT |
2769 					       MDCACHE_TRUST_DIR_CHUNKS)) {
2770 			/* Clean out the existing entries in the directory. */
2771 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2772 					"Flushing invalid dirent cache");
2773 			PTHREAD_RWLOCK_wrlock(&directory->content_lock);
2774 			mdcache_dirent_invalidate_all(directory);
2775 			has_write = true;
(8) Event else_branch: Reached else branch.
2776 		} else {
(9) Event lock: Locking "&directory->content_lock".
(10) Event cond_true: Condition "rc == 0", taking true branch.
(11) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(12) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(13) Event if_fallthrough: Falling through to end of if statement.
(14) Event if_end: End of if statement.
Also see events: [def][unlock][lockagain][use]
2777 			PTHREAD_RWLOCK_rdlock(&directory->content_lock);
2778 			has_write = false;
2779 		}
2780 	
(15) Event cond_true: Condition "look_ck == 0", taking true branch.
2781 		if (look_ck == 0) {
2782 			/* If starting from beginning, use the first_ck from the
2783 			 * directory instead, this is only non-zero if the first
2784 			 * chunk of the directory is still present.
2785 			 */
(16) Event def: Assigning data that might be protected by the lock to "look_ck".
Also see events: [lock][unlock][lockagain][use]
2786 			look_ck = directory->fsobj.fsdir.first_ck;
2787 		}
2788 	
2789 		/* We need to know if we need to set first_ck. */
(17) Event cond_true: Condition "whence == 0", taking true branch.
(18) Event cond_true: Condition "look_ck == 0", taking true branch.
(19) Event cond_true: Condition "whence == 0 && look_ck == 0", taking true branch.
2790 		set_first_ck = whence == 0 && look_ck == 0;
2791 	
(36) Event label: Reached label "again".
2792 	again:
2793 		/* Get here on first pass, retry if we don't hold the write lock,
2794 		 * and repeated passes if we need to fetch another chunk.
2795 		 */
2796 	
(20) Event cond_true: Condition "!!(component_log_level[COMPONENT_NFS_READDIR] >= NIV_FULL_DEBUG)", taking true branch.
(21) Event cond_true: Condition "!!(component_log_level[COMPONENT_NFS_READDIR] >= NIV_FULL_DEBUG)", taking true branch.
(37) Event cond_true: Condition "!!(component_log_level[COMPONENT_NFS_READDIR] >= NIV_FULL_DEBUG)", taking true branch.
(38) Event cond_true: Condition "!!(component_log_level[COMPONENT_NFS_READDIR] >= NIV_FULL_DEBUG)", taking true branch.
(40) Event use: Using an unreliable value of "look_ck" inside the second locked section. If the data that "look_ck" depends on was changed by another thread, this use might be incorrect.
Also see events: [lock][def][unlock][lockagain]
2797 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2798 				"Readdir chunked next_ck=0x%"PRIx64" look_ck=%"PRIx64,
2799 				next_ck, look_ck);
2800 	
(22) Event cond_true: Condition "look_ck == 0", taking true branch.
2801 		if (look_ck == 0 ||
2802 		    !mdcache_avl_lookup_ck(directory, look_ck, &dirent)) {
2803 			fsal_status_t status;
2804 			/* This starting position isn't in our cache...
2805 			 * Go populate the cache and process from there.
2806 			 */
(23) Event cond_true: Condition "!has_write", taking true branch.
2807 			if (!has_write) {
2808 				/* Upgrade to write lock and retry just in case
2809 				 * another thread managed to populate this cookie
2810 				 * in the meantime.
2811 				 */
(24) Event unlock: Unlocking "&directory->content_lock". "look_ck" might now be unreliable because other threads can now change the data that it depends on.
(25) Event cond_true: Condition "rc == 0", taking true branch.
(26) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(27) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(28) Event if_fallthrough: Falling through to end of if statement.
(29) Event if_end: End of if statement.
Also see events: [lock][def][lockagain][use]
2812 				PTHREAD_RWLOCK_unlock(&directory->content_lock);
(30) Event cond_true: Condition "rc == 0", taking true branch.
(31) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(32) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(33) Event if_fallthrough: Falling through to end of if statement.
(34) Event if_end: End of if statement.
(39) Event lockagain: Locking "&directory->content_lock" again.
Also see events: [lock][def][unlock][use]
2813 				PTHREAD_RWLOCK_wrlock(&directory->content_lock);
2814 				has_write = true;
2815 				first_pass = true;
2816 				chunk = NULL;
(35) Event goto: Jumping to label "again".
2817 				goto again;
2818 			}
2819 	
2820 			/* Assure that dirent is NULL */
2821 			dirent = NULL;
2822 	
2823 			if (look_ck != 0 &&
2824 			    look_ck == directory->fsobj.fsdir.first_ck) {
2825 				/* We failed to find the first dentry in the directory,
2826 				 * and will load this chunk.  Make sure we save
2827 				 * whatever is the new first_ck. */
2828 				set_first_ck = true;
2829 			}
2830 	
2831 			if (op_ctx->fsal_export->exp_ops.fs_supports(
2832 					op_ctx->fsal_export, fso_whence_is_name)
2833 			    && first_pass && directory->fsobj.fsdir.first_ck != 0) {
2834 				/* If whence must be the directory entry name we wish
2835 				 * to continue from, we need to start at the beginning
2836 				 * of the directory and readdir until we find the
2837 				 * caller's cookie, but we have the beginning of the
2838 				 * directory cached, so skip any chunks cached from
2839 				 * the start.
2840 				 *
2841 				 * Since the chunk we pass to
2842 				 * mdcache_populate_dir_chunk is the previous chunk
2843 				 * that function will use the chunk we resolved to
2844 				 * fetch the dirent name to continue from.
2845 				 *
2846 				 * If we DID NOT HAVE at least the first chunk cached,
2847 				 * mdcache_populate_dir_chunk MUST start from the
2848 				 * beginning, this is signaled by the fact that
2849 				 * prev_chunk will be NULL.
2850 				 *
2851 				 * In any case, whence will be the cookie we are looking
2852 				 * for.
2853 				 */
2854 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2855 						COMPONENT_CACHE_INODE,
2856 						"Search skipping initial chunks to find cookie");
2857 				chunk = mdcache_skip_chunks(
2858 					directory, directory->fsobj.fsdir.first_ck);
2859 				/* Since first_ck was not 0, we MUST have found at least
2860 				 * one chunk...
2861 				 */
2862 				assert(chunk != NULL);
2863 			}
2864 	
2865 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2866 					"Readdir chunked about to populate chunk %p next_ck=0x%"
2867 					PRIx64, chunk, next_ck);
2868 	
2869 			/* No, we need to populate a chunk using this cookie.
2870 			 *
2871 			 * NOTE: empty directory can result in dirent being NULL, and
2872 			 *       we will ALWAYS re-read an empty directory every time.
2873 			 *       Although we do end up setting MDCACHE_DIR_POPULATED on
2874 			 *       an empty directory, we don't consider that here, and
2875 			 *       will re-read the directory.
2876 			 */
2877 			status = mdcache_populate_dir_chunk(directory, next_ck,
2878 							    &dirent, chunk, &eod);
2879 	
2880 			if (FSAL_IS_ERROR(status)) {
2881 				PTHREAD_RWLOCK_unlock(&directory->content_lock);
2882 	
2883 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2884 						COMPONENT_CACHE_INODE,
2885 						"mdcache_populate_dir_chunk failed status=%s",
2886 						fsal_err_txt(status));
2887 	
2888 				if (status.major == ERR_FSAL_STALE)
2889 					mdcache_kill_entry(directory);
2890 	
2891 				return status;
2892 			}
2893 	
2894 			if (dirent == NULL) {
2895 				/* We must have reached the end of the directory, or the
2896 				 * directory was empty. In any case, there is no next
2897 				 * chunk or dirent.
2898 				 */
2899 				*eod_met = true;
2900 	
2901 				if (whence == 0) {
2902 					/* Since eod is true and whence is 0, we know
2903 					 * the entire directory is populated. This can
2904 					 * indicate that an empty directory may be
2905 					 * considered "populated."
2906 					 */
2907 					atomic_set_uint32_t_bits(&directory->mde_flags,
2908 								 MDCACHE_DIR_POPULATED);
2909 				}
2910 	
2911 				PTHREAD_RWLOCK_unlock(&directory->content_lock);
2912 	
2913 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2914 						COMPONENT_CACHE_INODE,
2915 						"readdir completed, eod = %s",
2916 						*eod_met ? "true" : "false");
2917 	
2918 				return status;
2919 			}
2920 	
2921 			if ((whence == 0) && eod) {
2922 				/* We started at the beginning of the directory and
2923 				 * populated through to end of directory, thus we can
2924 				 * indicate the directory is fully populated.
2925 				 */
2926 				atomic_set_uint32_t_bits(&directory->mde_flags,
2927 							 MDCACHE_DIR_POPULATED);
2928 			} else {
2929 				/* Since we just populated a chunk and have not
2930 				 * determined that we read the entire directory, make
2931 				 * sure the MDCACHE_DIR_POPULATED is cleared.
2932 				 */
2933 				atomic_clear_uint32_t_bits(&directory->mde_flags,
2934 							   MDCACHE_DIR_POPULATED);
2935 			}
2936 	
2937 			chunk = dirent->chunk;
2938 	
2939 			LogFullDebugAlt(COMPONENT_NFS_READDIR,
2940 					COMPONENT_CACHE_INODE,
2941 					"mdcache_populate_dir_chunk finished chunk %p dirent %p %s",
2942 					chunk, dirent, dirent->name);
2943 	
2944 			if (set_first_ck) {
2945 				/* We just populated the first dirent in the directory,
2946 				 * save it's cookie as first_ck.
2947 				 */
2948 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
2949 						COMPONENT_CACHE_INODE,
2950 						"Setting directory first_ck=%"PRIx64,
2951 						dirent->ck);
2952 				directory->fsobj.fsdir.first_ck = dirent->ck;
2953 				set_first_ck = false;
2954 			}
2955 		} else {
2956 			/* We found the dirent... If next_ck is NOT whence, we SHOULD
2957 			 * have found the first dirent in the chunk, if not, then
2958 			 * something went wrong at some point. That chunk is valid,
2959 			 */
2960 			chunk = dirent->chunk;
2961 			LogFullDebugAlt(COMPONENT_NFS_READDIR,
2962 					COMPONENT_CACHE_INODE,
2963 					"found dirent in cached chunk %p dirent %p %s",
2964 					chunk, dirent, dirent->name);
2965 		}
2966 	
2967 		/* Bump the chunk in the LRU */
2968 		lru_bump_chunk(chunk);
2969 	
2970 		/* We can drop the ref now, we've bumped.  This cannot be the last ref
2971 		 * drop.  To get here, we had at least 2 refs, and we also hold the
2972 		 * content_lock for at least read.  This means noone holds it for write,
2973 		 * and all final ref drops are done with it held for write. */
2974 		mdcache_lru_unref_chunk(chunk);
2975 	
2976 		LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
2977 				"About to read directory=%p cookie=%" PRIx64,
2978 				directory, next_ck);
2979 	
2980 		/* Now satisfy the request from the cached readdir--stop when either
2981 		 * the requested sequence or dirent sequence is exhausted */
2982 	
2983 		for (;
2984 		     dirent != NULL;
2985 		     dirent = glist_next_entry(&chunk->dirents,
2986 					       mdcache_dir_entry_t,
2987 					       chunk_list,
2988 					       &dirent->chunk_list)) {
2989 			fsal_status_t status;
2990 			enum fsal_dir_result cb_result;
2991 			mdcache_entry_t *entry = NULL;
2992 			struct attrlist attrs;
2993 	
2994 			if (dirent->flags & DIR_ENTRY_FLAG_DELETED) {
2995 				/* Skip deleted entries */
2996 				continue;
2997 			}
2998 	
2999 			status.major = ERR_FSAL_NO_ERROR;
3000 			/* We have the content_lock for at least read. */
3001 			if (dirent->entry) {
3002 				/* Take a ref for our use */
3003 				entry = dirent->entry;
3004 				mdcache_get(entry);
3005 			} else {
3006 				/* Not cached, get actual entry using the dirent ckey */
3007 				status = mdcache_find_keyed_reason(&dirent->ckey,
3008 								   &entry,
3009 								   MDC_REASON_SCAN);
3010 			}
3011 	
3012 			if (FSAL_IS_ERROR(status)) {
3013 				/* Failed using ckey, do full lookup. */
3014 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
3015 						COMPONENT_CACHE_INODE,
3016 						"Lookup by key for %s failed, lookup by name now",
3017 						dirent->name);
3018 	
3019 				/* mdc_lookup_uncached needs write lock, dropping the
3020 				 * read lock means we can no longer trust the dirent or
3021 				 * the chunk.
3022 				 */
3023 				if (!has_write) {
3024 					/* We will have to re-find this dirent after we
3025 					 * re-acquire the lock.
3026 					 */
3027 					look_ck = dirent->ck;
3028 	
3029 					PTHREAD_RWLOCK_unlock(&directory->content_lock);
3030 					PTHREAD_RWLOCK_wrlock(&directory->content_lock);
3031 					has_write = true;
3032 	
3033 					/* Dropping the content_lock may have
3034 					 * invalidated some or all of the dirents and/or
3035 					 * chunks in this directory.  We need to start
3036 					 * over from this point.  look_ck is now correct
3037 					 * if the dirent is still cached, and we haven't
3038 					 * changed next_ck, so it's still correct for
3039 					 * reloading the chunk.
3040 					 */
3041 					first_pass = true;
3042 					chunk = NULL;
3043 	
3044 					/* Now we need to look for this dirent again.
3045 					 * We haven't updated next_ck for this dirent
3046 					 * yet, so it is the right whence to use for a
3047 					 * repopulation readdir if the chunk is
3048 					 * discarded.
3049 					 */
3050 					goto again;
3051 				} else if (op_ctx->fsal_export->exp_ops.fs_supports(
3052 					   op_ctx->fsal_export, fso_readdir_plus)) {
3053 	
3054 					/* If the FSAL supports readdir_plus, then a
3055 					 * single round-trip for the chunk is preferable
3056 					 * to lookups for every missing obj.  Nuke the
3057 					 * chunk, and reload it using readdir_plus */
3058 					look_ck = dirent->ck;
3059 					next_ck = chunk->reload_ck;
3060 					reload_chunk = true;
3061 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
3062 							COMPONENT_CACHE_INODE,
3063 							"Reloading chunk %p look_ck %"
3064 							PRIx64" next_ck %"PRIx64,
3065 							chunk, look_ck, next_ck);
3066 					/* In order to get here, we passed the has_write
3067 					 * check above, and took the write lock. */
3068 					mdcache_lru_unref_chunk(chunk);
3069 					chunk = NULL;
3070 					goto again;
3071 				}
3072 	
3073 				status = mdc_lookup_uncached(directory, dirent->name,
3074 							     &entry, NULL);
3075 	
3076 				if (FSAL_IS_ERROR(status)) {
3077 					PTHREAD_RWLOCK_unlock(&directory->content_lock);
3078 	
3079 					LogFullDebugAlt(COMPONENT_NFS_READDIR,
3080 							COMPONENT_CACHE_INODE,
3081 							"lookup by name failed status=%s",
3082 							fsal_err_txt(status));
3083 	
3084 					if (status.major == ERR_FSAL_STALE)
3085 						mdcache_kill_entry(directory);
3086 	
3087 					return status;
3088 				}
3089 			}
3090 	
3091 			if (has_write && dirent->entry) {
3092 				/* If we get here, we have the write lock, have an
3093 				 * entry, and took a ref on it above.  The dirent also
3094 				 * has a ref on the entry.  Drop that ref now.  This can
3095 				 * only be done under the write lock.  If we don't have
3096 				 * the write lock, then this was not the readdir that
3097 				 * took the ref, and another readdir will drop the ref,
3098 				 * or it will be dropped when the dirent is cleaned up.
3099 				 * */
3100 				mdcache_put(dirent->entry);
3101 				dirent->entry = NULL;
3102 			}
3103 	
3104 			if (reload_chunk && look_ck != 0 && dirent->ck !=
3105 				   look_ck) {
3106 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
3107 						COMPONENT_CACHE_INODE,
3108 						"Skipping already used dirent %s (%p)",
3109 						dirent->name, &entry->obj_handle);
3110 				/* This chunk was reloaded, but some dirents were
3111 				 * already consumed.  Deref and continue */
3112 				mdcache_put(entry);
3113 				continue;
3114 			}
3115 	
3116 			if (dirent->ck == whence) {
3117 				/* When called with whence, the caller always wants the
3118 				 * next entry, skip this entry. */
3119 				mdcache_put(entry);
3120 				continue;
3121 			}
3122 	
3123 			next_ck = dirent->ck;
3124 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
3125 					"Setting next_ck=%"PRIx64,
3126 					next_ck);
3127 	
3128 			/* Ensure the attribute cache is valid.  The simplest way to do
3129 			 * this is to call getattrs().  We need a copy anyway, to ensure
3130 			 * thread safety.
3131 			 */
3132 			fsal_prepare_attrs(&attrs, attrmask);
3133 	
3134 			status = entry->obj_handle.obj_ops->getattrs(&entry->obj_handle,
3135 								    &attrs);
3136 			if (FSAL_IS_ERROR(status)) {
3137 				PTHREAD_RWLOCK_unlock(&directory->content_lock);
3138 	
3139 				LogFullDebugAlt(COMPONENT_NFS_READDIR,
3140 						COMPONENT_CACHE_INODE,
3141 						"getattrs failed status=%s",
3142 						fsal_err_txt(status));
3143 	
3144 				mdcache_put(entry);
3145 				return status;
3146 			}
3147 	
3148 	#ifdef USE_LTTNG
3149 			tracepoint(mdcache, mdc_readdir_cb,
3150 				   __func__, __LINE__, dirent->name, &entry->obj_handle,
3151 				   entry->sub_handle, entry->lru.refcnt);
3152 	#endif
3153 			cb_result = cb(dirent->name, &entry->obj_handle, &entry->attrs,
3154 				       dir_state, dirent->ck);
3155 	
3156 			fsal_release_attrs(&attrs);
3157 	
3158 			/* The ref on entry was put by the callback.  Don't use it
3159 			 * anymore */
3160 	
3161 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
3162 					"dirent = %p %s, cb_result = %s, eod = %s",
3163 					dirent, dirent->name,
3164 					fsal_dir_result_str(cb_result),
3165 					dirent->eod ? "true" : "false");
3166 	
3167 			if (cb_result >= DIR_TERMINATE || dirent->eod) {
3168 				/* Caller is done, or we have reached the end of
3169 				 * the directory, no need to get another dirent.
3170 				 */
3171 	
3172 				/* If cb_result is DIR_TERMINATE, the callback did
3173 				 * not consume this entry, so we can not have reached
3174 				 * end of directory.
3175 				 */
3176 				*eod_met = cb_result != DIR_TERMINATE && dirent->eod;
3177 	
3178 				if (*eod_met && whence == 0) {
3179 					/* Since eod is true and whence is 0, we know
3180 					 * the entire directory is populated.
3181 					 */
3182 					atomic_set_uint32_t_bits(&directory->mde_flags,
3183 								 MDCACHE_DIR_POPULATED);
3184 				}
3185 	
3186 				LogDebugAlt(COMPONENT_NFS_READDIR,
3187 					    COMPONENT_CACHE_INODE,
3188 					    "readdir completed, eod = %s",
3189 					    *eod_met ? "true" : "false");
3190 	
3191 				PTHREAD_RWLOCK_unlock(&directory->content_lock);
3192 	
3193 				return status;
3194 			}
3195 	
3196 			reload_chunk = false;
3197 		}
3198 	
3199 		if (chunk->next_ck != 0) {
3200 			/* If the chunk has a known chunk following it, use the first
3201 			 * cookie in that chunk for AVL tree lookup, which will succeed
3202 			 * rather than having to do a readdir to find the next entry.
3203 			 *
3204 			 * If the chunk is no longer present, the lookup will fail, in
3205 			 * which case next_ck is the right cookie to use as the whence
3206 			 * for the next readdir.
3207 			 */
3208 			look_ck = chunk->next_ck;
3209 			LogFullDebugAlt(COMPONENT_NFS_READDIR, COMPONENT_CACHE_INODE,
3210 					"Setting look_ck from next_ck=%"PRIx64,
3211 					chunk->next_ck);
3212 		} else {
3213 			/* The next chunk is not resident, or we don't know what the
3214 			 * next_ck is. Skip right to populating the next chunk. next_ck
3215 			 * is the right cookie to use as the whence for the next
3216 			 * readdir.
3217 			 */
3218 			look_ck = 0;
3219 		}
3220 	
3221 		/* Due to the conditions we return from inside the loop, we know that if
3222 		 * we reach the end of the chunk we must fetch another chunk to satisfy
3223 		 * the directory read. The next_ck is the cookie for the next dirent to
3224 		 * find, which should be the first dirent of the next chunk.
3225 		 */
3226 	
3227 		/* NOTE: An FSAL that does not return 0 or LAST_COOKIE
3228 		 *       as the cookie for the last directory entry will
3229 		 *       result in our attempting to find one more
3230 		 *       chunk, which will not succeed and then the eod
3231 		 *       condition detected above before the while loop
3232 		 *       will kick in.
3233 		 */
3234 	
3235 		/* NOTE: We also keep the write lock if we already had
3236 		 *       it. Most likely we will need to populate the
3237 		 *       next chunk also. It's probably not worth
3238 		 *       dropping the write lock and taking the read
3239 		 *       lock just in case the next chunk actually
3240 		 *       happens to be populated.
3241 		 */
3242 		first_pass = false;
3243 		goto again;
3244 	}
3245 	
3246 	/**
3247 	 * @brief Forcibly remove an entry from the cache (top half)
3248 	 *
3249 	 * This function is used to invalidate a cache entry when it
3250 	 * has become unusable (for example, when the FSAL declares it to be
3251 	 * stale).
3252 	 *
3253 	 * To simplify interaction with the SAL, this function no longer
3254 	 * finalizes the entry, but schedules the entry for out-of-line
3255 	 * cleanup, after first making it unreachable.
3256 	 *
3257 	 * @param[in] entry The entry to be killed
3258 	 */
3259 	
3260 	void
3261 	_mdcache_kill_entry(mdcache_entry_t *entry,
3262 			    char *file, int line, char *function)
3263 	{
3264 		bool freed;
3265 	
3266 		if (isDebug(COMPONENT_CACHE_INODE)) {
3267 			DisplayLogComponentLevel(COMPONENT_CACHE_INODE,
3268 						 file, line, function, NIV_DEBUG,
3269 						 "Kill %s entry %p obj_handle %p",
3270 						 object_file_type_to_str(
3271 								entry->obj_handle.type),
3272 						 entry, &entry->obj_handle);
3273 		}
3274 	
3275 		freed = cih_remove_checked(entry); /* !reachable, drop sentinel ref */
3276 	#ifdef USE_LTTNG
3277 		tracepoint(mdcache, mdc_kill_entry,
3278 			   function, line, &entry->obj_handle, entry->lru.refcnt,
3279 			   freed);
3280 	#endif
3281 	
3282 		if (!freed) {
3283 			/* queue for cleanup */
3284 			mdcache_lru_cleanup_push(entry);
3285 		}
3286 	
3287 	}
3288 	
3289 	/**
3290 	 * @brief Update the cached attributes
3291 	 *
3292 	 * Update the cached attributes on @a entry with the attributes in @a attrs
3293 	 *
3294 	 * @note The caller must hold the attribute lock for WRITE
3295 	 *
3296 	 * @param[in] entry	Entry to update
3297 	 * @param[in] attrs	New attributes to cache
3298 	 * @return FSAL status
3299 	 */
3300 	void mdc_update_attr_cache(mdcache_entry_t *entry, struct attrlist *attrs)
3301 	{
3302 		if (entry->attrs.acl != NULL) {
3303 			/* We used to have an ACL... */
3304 			if (attrs->acl != NULL) {
3305 				/* We got an ACL from the sub FSAL whether we asked for
3306 				 * it or not, given that we had an ACL before, and we
3307 				 * got a new one, update the ACL, so release the old
3308 				 * one.
3309 				 */
3310 				nfs4_acl_release_entry(entry->attrs.acl);
3311 			} else {
3312 				/* A new ACL wasn't provided, so move the old one
3313 				 * into the new attributes so it will be preserved
3314 				 * by the fsal_copy_attrs.
3315 				 */
3316 				attrs->acl = entry->attrs.acl;
3317 				attrs->valid_mask |= ATTR_ACL;
3318 			}
3319 	
3320 			/* NOTE: Because we already had an ACL,
3321 			 * entry->attrs.request_mask MUST have the ATTR_ACL bit set.
3322 			 * This will assure that fsal_copy_attrs below will copy the
3323 			 * selected ACL (old or new) into entry->attrs.
3324 			 */
3325 	
3326 			/* ACL was released or moved to new attributes. */
3327 			entry->attrs.acl = NULL;
3328 		} else if (attrs->acl != NULL) {
3329 			/* We didn't have an ACL before, but we got a new one. We may
3330 			 * not have asked for it, but receive it anyway.
3331 			 */
3332 			entry->attrs.request_mask |= ATTR_ACL;
3333 		}
3334 	
3335 		// Same as above but for fs_locations
3336 		if (entry->attrs.fs_locations != NULL) {
3337 			if (attrs->fs_locations != NULL) {
3338 				nfs4_fs_locations_release(entry->attrs.fs_locations);
3339 			} else {
3340 				attrs->fs_locations = entry->attrs.fs_locations;
3341 				attrs->valid_mask |= ATTR4_FS_LOCATIONS;
3342 			}
3343 	
3344 			entry->attrs.fs_locations = NULL;
3345 		} else if (attrs->fs_locations != NULL) {
3346 			entry->attrs.request_mask |= ATTR4_FS_LOCATIONS;
3347 		}
3348 	
3349 		// Same as above but for sec_label
3350 		if (entry->attrs.sec_label.slai_data.slai_data_val != NULL) {
3351 			char *secdata = entry->attrs.sec_label.slai_data.slai_data_val;
3352 	
3353 			if (attrs->sec_label.slai_data.slai_data_val != NULL) {
3354 				gsh_free(secdata);
3355 			} else {
3356 				attrs->sec_label.slai_data.slai_data_len =
3357 					entry->attrs.sec_label.slai_data.slai_data_len;
3358 				attrs->sec_label.slai_data.slai_data_val = secdata;
3359 				attrs->valid_mask |= ATTR4_SEC_LABEL;
3360 			}
3361 	
3362 			entry->attrs.sec_label.slai_data.slai_data_len = 0;
3363 			entry->attrs.sec_label.slai_data.slai_data_val = NULL;
3364 		} else if (attrs->sec_label.slai_data.slai_data_val != NULL) {
3365 			entry->attrs.request_mask |= ATTR4_SEC_LABEL;
3366 		}
3367 	
3368 		if (attrs->expire_time_attr == 0) {
3369 			/* FSAL did not set this, retain what was in the entry. */
3370 			attrs->expire_time_attr = entry->attrs.expire_time_attr;
3371 		}
3372 	
3373 		/* Now move the new attributes into the entry. */
3374 		fsal_copy_attrs(&entry->attrs, attrs, true);
3375 	
3376 		/* Note that we use &entry->attrs here in case attrs.request_mask was
3377 		 * modified by the FSAL. entry->attrs.request_mask reflects the
3378 		 * attributes we requested, and was updated to "request" ACL if the
3379 		 * FSAL provided one for us gratis.
3380 		 */
3381 		mdc_fixup_md(entry, &entry->attrs);
3382 	}
3383 	
3384 	/** @} */
3385