1    	/*
2    	 * Copyright (C) CohortFS (2014)
3    	 * contributor : William Allen Simpson <bill@CohortFS.com>
4    	 *
5    	 *
6    	 * This program is free software; you can redistribute it and/or
7    	 * modify it under the terms of the GNU Lesser General Public License
8    	 * as published by the Free Software Foundation; either version 3 of
9    	 * the License, or (at your option) any later version.
10   	 *
11   	 * This program is distributed in the hope that it will be useful, but
12   	 * WITHOUT ANY WARRANTY; without even the implied warranty of
13   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   	 * Lesser General Public License for more details.
15   	 *
16   	 * You should have received a copy of the GNU Lesser General Public
17   	 * License along with this library; if not, write to the Free Software
18   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19   	 * 02110-1301 USA
20   	 *
21   	 * ---------------------------------------
22   	 */
23   	
24   	/**
25   	 * @file  ds.c
26   	 * @brief Data Server parsing and management
27   	 */
28   	#include "config.h"
29   	#include "config_parsing.h"
30   	#include "log.h"
31   	#include "fsal.h"
32   	#include "nfs_core.h"
33   	#include "FSAL/fsal_commonlib.h"
34   	#include "pnfs_utils.h"
35   	
36   	/**
37   	 * @brief Servers are stored in an AVL tree with front-end cache.
38   	 *
39   	 * @note  number of cache slots should be prime.
40   	 */
41   	#define SERVER_BY_ID_CACHE_SIZE 193
42   	
43   	struct server_by_id {
44   		pthread_rwlock_t lock;
45   		struct avltree t;
46   		struct avltree_node *cache[SERVER_BY_ID_CACHE_SIZE];
47   	};
48   	
49   	static struct server_by_id server_by_id;
50   	
51   	/** List of all active data servers,
52   	  * protected by server_by_id.lock
53   	  */
54   	static struct glist_head dslist;
55   	
56   	/**
57   	 * @brief Compute cache slot for an entry
58   	 *
59   	 * This function computes a hash slot, taking an address modulo the
60   	 * number of cache slots (which should be prime).
61   	 *
62   	 * @param k [in] Entry index value
63   	 *
64   	 * @return The computed offset.
65   	 */
66   	static inline uint16_t id_cache_offsetof(uint16_t k)
67   	{
68   		return k % SERVER_BY_ID_CACHE_SIZE;
69   	}
70   	
71   	/**
72   	 * @brief Server id comparator for AVL tree walk
73   	 *
74   	 */
75   	static int server_id_cmpf(const struct avltree_node *lhs,
76   				  const struct avltree_node *rhs)
77   	{
78   		struct fsal_pnfs_ds *lk, *rk;
79   	
80   		lk = avltree_container_of(lhs, struct fsal_pnfs_ds, ds_node);
81   		rk = avltree_container_of(rhs, struct fsal_pnfs_ds, ds_node);
82   		if (lk->id_servers != rk->id_servers)
83   			return (lk->id_servers < rk->id_servers) ? -1 : 1;
84   		else
85   			return 0;
86   	}
87   	
88   	/**
89   	 * @brief Allocate the pDS entry.
90   	 *
91   	 * @return pointer to fsal_pnfs_ds.
92   	 * NULL on allocation errors.
93   	 */
94   	
95   	struct fsal_pnfs_ds *pnfs_ds_alloc(void)
96   	{
97   		return gsh_calloc(1, sizeof(struct fsal_pnfs_ds));
98   	}
99   	
100  	/**
101  	 * @brief Free the pDS entry.
102  	 */
103  	
104  	void pnfs_ds_free(struct fsal_pnfs_ds *pds)
105  	{
106  		if (!pds->refcount)
107  			return;
108  	
109  		gsh_free(pds);
110  	}
111  	
112  	/**
113  	 * @brief Insert the pDS entry into the AVL tree.
114  	 *
115  	 * @param exp [IN] the server entry
116  	 *
117  	 * @return false on failure.
118  	 */
119  	
120  	bool pnfs_ds_insert(struct fsal_pnfs_ds *pds)
121  	{
122  		struct avltree_node *node;
123  		void **cache_slot = (void **)
124  			&(server_by_id.cache[id_cache_offsetof(pds->id_servers)]);
125  	
126  		/* we will hold a ref starting out... */
127  		assert(pds->refcount == 1);
128  	
129  		PTHREAD_RWLOCK_wrlock(&server_by_id.lock);
130  		node = avltree_insert(&pds->ds_node, &server_by_id.t);
131  		if (node) {
132  			/* somebody beat us to it */
133  			PTHREAD_RWLOCK_unlock(&server_by_id.lock);
134  			return false;
135  		}
136  	
137  		/* update cache */
138  		atomic_store_voidptr(cache_slot, &pds->ds_node);
139  		glist_add_tail(&dslist, &pds->ds_list);
140  	
141  		pnfs_ds_get_ref(pds);		/* == 2 */
142  		if (pds->mds_export != NULL) {
143  			/* also bump related export for duration */
144  			get_gsh_export_ref(pds->mds_export);
145  			pds->mds_export->has_pnfs_ds = true;
146  		}
147  	
148  		PTHREAD_RWLOCK_unlock(&server_by_id.lock);
149  		return true;
150  	}
151  	
152  	/**
153  	 * @brief Lookup the fsal_pnfs_ds struct for this server id
154  	 *
155  	 * Lookup the fsal_pnfs_ds struct by id_servers.
156  	 * Server ids are assigned by the config file and carried about
157  	 * by file handles.
158  	 *
159  	 * @param id_servers   [IN] the server id extracted from the handle
160  	 *
161  	 * @return pointer to ref locked server
162  	 */
163  	struct fsal_pnfs_ds *pnfs_ds_get(uint16_t id_servers)
164  	{
165  		struct fsal_pnfs_ds v;
166  		struct avltree_node *node;
167  		struct fsal_pnfs_ds *pds;
168  		void **cache_slot = (void **)
169  			&(server_by_id.cache[id_cache_offsetof(id_servers)]);
170  	
171  		v.id_servers = id_servers;
172  		PTHREAD_RWLOCK_rdlock(&server_by_id.lock);
173  	
174  		/* check cache */
175  		node = (struct avltree_node *)atomic_fetch_voidptr(cache_slot);
176  		if (node) {
177  			pds = avltree_container_of(node, struct fsal_pnfs_ds, ds_node);
178  			if (pds->id_servers == id_servers) {
179  				/* got it in 1 */
180  				LogDebug(COMPONENT_HASHTABLE_CACHE,
181  					 "server_by_id cache hit slot %d",
182  					 id_cache_offsetof(id_servers));
183  				goto out;
184  			}
185  		}
186  	
187  		/* fall back to AVL */
188  		node = avltree_lookup(&v.ds_node, &server_by_id.t);
189  		if (node) {
190  			pds = avltree_container_of(node, struct fsal_pnfs_ds, ds_node);
191  			/* update cache */
192  			atomic_store_voidptr(cache_slot, node);
193  		} else {
194  			PTHREAD_RWLOCK_unlock(&server_by_id.lock);
195  			return NULL;
196  		}
197  	
198  	 out:
199  		pnfs_ds_get_ref(pds);
200  		if (pds->mds_export != NULL)
201  			/* also bump related export for duration */
202  			get_gsh_export_ref(pds->mds_export);
203  	
204  		PTHREAD_RWLOCK_unlock(&server_by_id.lock);
205  		return pds;
206  	}
207  	
208  	/**
209  	 * @brief Release the fsal_pnfs_ds struct
210  	 *
211  	 * @param exp [IN] the server entry
212  	 */
213  	
214  	void pnfs_ds_put(struct fsal_pnfs_ds *pds)
215  	{
216  		int32_t refcount = atomic_dec_int32_t(&pds->refcount);
217  	
(1) Event cond_false: Condition "refcount != 0", taking false branch.
(1) Event cond_false: Condition "refcount != 0", taking false branch.
218  		if (refcount != 0) {
219  			assert(refcount > 0);
220  			return;
(2) Event if_end: End of if statement.
(2) Event if_end: End of if statement.
221  		}
222  	
223  		/* free resources */
(3) Event deref_parm_in_call: Function "fsal_pnfs_ds_fini" dereferences "pds". [details]
224  		fsal_pnfs_ds_fini(pds);
(3) Event freed_arg: "gsh_free" frees parameter "pds". [details]
225  		gsh_free(pds);
226  	}
227  	
228  	/**
229  	 * @brief Remove the pDS entry from the AVL tree.
230  	 *
231  	 * @param id_servers   [IN] the server id extracted from the handle
232  	 * @param final        [IN] Also drop from FSAL.
233  	 */
234  	
235  	void pnfs_ds_remove(uint16_t id_servers, bool final)
236  	{
237  		struct fsal_pnfs_ds v;
238  		struct avltree_node *node;
239  		struct fsal_pnfs_ds *pds = NULL;
240  		void **cache_slot = (void **)
241  			&(server_by_id.cache[id_cache_offsetof(id_servers)]);
242  	
243  		v.id_servers = id_servers;
(1) Event cond_true: Condition "rc == 0", taking true branch.
(2) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(3) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(4) Event if_fallthrough: Falling through to end of if statement.
(5) Event if_end: End of if statement.
244  		PTHREAD_RWLOCK_wrlock(&server_by_id.lock);
245  	
246  		node = avltree_lookup(&v.ds_node, &server_by_id.t);
(6) Event cond_true: Condition "node", taking true branch.
247  		if (node) {
248  			struct avltree_node *cnode = (struct avltree_node *)
249  				 atomic_fetch_voidptr(cache_slot);
250  	
251  			/* Remove from the AVL cache and tree */
(7) Event cond_true: Condition "node == cnode", taking true branch.
252  			if (node == cnode)
253  				atomic_store_voidptr(cache_slot, NULL);
254  			avltree_remove(node, &server_by_id.t);
255  	
256  			pds = avltree_container_of(node, struct fsal_pnfs_ds, ds_node);
257  	
258  			/* Remove the DS from the DS list */
259  			glist_del(&pds->ds_list);
260  	
261  			/* Eliminate repeated locks during draining. Idempotent. */
262  			pds->pnfs_ds_status = PNFS_DS_STALE;
263  		}
264  	
(8) Event cond_true: Condition "rc == 0", taking true branch.
(9) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(10) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(11) Event if_fallthrough: Falling through to end of if statement.
(12) Event if_end: End of if statement.
265  		PTHREAD_RWLOCK_unlock(&server_by_id.lock);
266  	
267  		/* removal has a once-only semantic */
(13) Event cond_true: Condition "pds != NULL", taking true branch.
268  		if (pds != NULL) {
(14) Event cond_true: Condition "pds->mds_export != NULL", taking true branch.
269  			if (pds->mds_export != NULL)
270  				/* special case: avoid lookup of related export.
271  				 * get_gsh_export_ref() was bumped in pnfs_ds_insert()
272  				 *
273  				 * once-only, so no need for lock here.
274  				 * do not pre-clear related export (mds_export).
275  				 * always check pnfs_ds_status instead.
276  				 */
277  				put_gsh_export(pds->mds_export);
278  	
279  			/* Release table reference to the server.
280  			 * Release of resources will occur on last reference.
281  			 * Which may or may not be from this call.
282  			 */
(15) Event freed_arg: "pnfs_ds_put" frees "pds". [details]
Also see events: [deref_arg]
283  			pnfs_ds_put(pds);
284  	
(16) Event cond_true: Condition "final", taking true branch.
285  			if (final) {
286  				/* Also drop from FSAL.  Instead of pDS thread,
287  				 * relying on export cleanup thread.
288  				 */
(17) Event deref_arg: Calling "pnfs_ds_put" dereferences freed pointer "pds". [details]
Also see events: [freed_arg]
289  				pnfs_ds_put(pds);
290  			}
291  		}
292  	}
293  	
294  	/**
295  	 * @brief Remove all DSs left in the system
296  	 *
297  	 * Make sure all DSs are freed on shutdown.  This will catch all DSs not
298  	 * associated with an export.
299  	 *
300  	 */
301  	void remove_all_dss(void)
302  	{
303  		struct glist_head tmplist, *glist, *glistn;
304  		struct fsal_pnfs_ds *pds;
305  	
306  		glist_init(&tmplist);
307  	
308  		/* pnfs_ds_remove() take the lock, so move the entire list to a tmp head
309  		 * under the lock, then process it outside the lock. */
310  		PTHREAD_RWLOCK_wrlock(&server_by_id.lock);
311  		glist_splice_tail(&tmplist, &dslist);
312  		PTHREAD_RWLOCK_unlock(&server_by_id.lock);
313  	
314  		/* Now we can safely process the list without the lock */
315  		glist_for_each_safe(glist, glistn, &tmplist) {
316  			pds = glist_entry(glist, struct fsal_pnfs_ds, ds_list);
317  			pnfs_ds_remove(pds->id_servers, true);
318  		}
319  	}
320  	
321  	/**
322  	 * @brief Commit a FSAL sub-block
323  	 *
324  	 * Use the Name parameter passed in via the self_struct to lookup the
325  	 * fsal.  If the fsal is not loaded (yet), load it and call its init.
326  	 *
327  	 * Create the pDS and pass the FSAL sub-block to it so that the
328  	 * fsal method can process the rest of the parameters in the block
329  	 */
330  	
331  	static int fsal_cfg_commit(void *node, void *link_mem, void *self_struct,
332  			       struct config_error_type *err_type)
333  	{
334  		struct fsal_args *fp = self_struct;
335  		struct fsal_module **pds_fsal = link_mem;
336  		struct fsal_pnfs_ds *pds =
337  			container_of(pds_fsal, struct fsal_pnfs_ds, fsal);
338  		struct fsal_module *fsal;
339  		struct root_op_context root_op_context;
340  		fsal_status_t status;
341  		int errcnt;
342  	
343  		/* Initialize req_ctx */
344  		init_root_op_context(&root_op_context, NULL, NULL, 0, 0,
345  				     UNKNOWN_REQUEST);
346  	
347  		errcnt = fsal_load_init(node, fp->name, &fsal, err_type);
348  		if (errcnt > 0)
349  			goto err;
350  	
351  		status = fsal->m_ops.fsal_pnfs_ds(fsal, node, &pds);
352  		if (status.major != ERR_FSAL_NO_ERROR) {
353  			fsal_put(fsal);
354  			LogCrit(COMPONENT_CONFIG,
355  				"Could not create pNFS DS");
356  			LogFullDebug(COMPONENT_FSAL,
357  				     "FSAL %s refcount %"PRIu32,
358  				     fsal->name,
359  				     atomic_fetch_int32_t(&fsal->refcount));
360  			err_type->init = true;
361  			errcnt++;
362  		}
363  	
364  		LogEvent(COMPONENT_CONFIG,
365  			 "DS %d fsal config commit at FSAL (%s) with path (%s)",
366  			 pds->id_servers, pds->fsal->name, pds->fsal->path);
367  	
368  	err:
369  		release_root_op_context();
370  		/* Don't leak the FSAL block */
371  		err_type->dispose = true;
372  		return errcnt;
373  	}
374  	
375  	/**
376  	 * @brief pNFS DS block handlers
377  	 */
378  	
379  	/**
380  	 * @brief Initialize the DS block
381  	 */
382  	
383  	static void *pds_init(void *link_mem, void *self_struct)
384  	{
385  		static struct fsal_pnfs_ds special_ds;
386  	
387  		if (link_mem == (void *)~0UL) {
388  			/* This is the special case of no config.  We cannot malloc
389  			 * this, as it's never committed, so it's leaked. */
390  			memset(&special_ds, 0, sizeof(special_ds));
391  			return &special_ds;
392  		} else if (self_struct == NULL) {
393  			return pnfs_ds_alloc();
394  		} else { /* free resources case */
395  			pnfs_ds_free(self_struct);
396  			return NULL;
397  		}
398  	}
399  	
400  	/**
401  	 * @brief Commit the DS block
402  	 *
403  	 * Validate the DS level parameters?  fsal and client
404  	 * parameters are already done.
405  	 */
406  	
407  	static int pds_commit(void *node, void *link_mem, void *self_struct,
408  			      struct config_error_type *err_type)
409  	{
410  		struct fsal_pnfs_ds *pds = self_struct;
411  		struct fsal_pnfs_ds *probe = pnfs_ds_get(pds->id_servers);
412  	
413  		/* redundant probe before insert??? */
414  		if (probe != NULL) {
415  			LogDebug(COMPONENT_CONFIG,
416  				 "Server %d already exists!",
417  				 pds->id_servers);
418  			pnfs_ds_put(probe);
419  			err_type->exists = true;
420  			return 1;
421  		}
422  	
423  		if (!pnfs_ds_insert(pds)) {
424  			LogCrit(COMPONENT_CONFIG,
425  				"Server id %d already in use.",
426  				pds->id_servers);
427  			err_type->exists = true;
428  			return 1;
429  		}
430  	
431  		LogEvent(COMPONENT_CONFIG,
432  			 "DS %d created at FSAL (%s) with path (%s)",
433  			 pds->id_servers, pds->fsal->name, pds->fsal->path);
434  		return 0;
435  	}
436  	
437  	/**
438  	 * @brief Display the DS block
439  	 */
440  	
441  	static void pds_display(const char *step, void *node,
442  			       void *link_mem, void *self_struct)
443  	{
444  		struct fsal_pnfs_ds *pds = self_struct;
445  		struct fsal_module *fsal = pds->fsal;
446  	
447  		LogMidDebug(COMPONENT_CONFIG,
448  			    "%s %p DS %d FSAL (%s) with path (%s)",
449  			    step, pds, pds->id_servers, fsal->name, fsal->path);
450  	}
451  	
452  	/**
453  	 * @brief Table of FSAL sub-block parameters
454  	 *
455  	 * NOTE: this points to a struct that is private to
456  	 * fsal_cfg_commit.
457  	 */
458  	
459  	static struct config_item fsal_params[] = {
460  		CONF_ITEM_STR("Name", 1, 10, NULL,
461  			      fsal_args, name), /* cheater union */
462  		CONFIG_EOL
463  	};
464  	
465  	/**
466  	 * @brief Table of DS block parameters
467  	 *
468  	 * NOTE: the Client and FSAL sub-blocks must be the *last*
469  	 * two entries in the list.  This is so all other
470  	 * parameters have been processed before these sub-blocks
471  	 * are processed.
472  	 */
473  	
474  	static struct config_item pds_items[] = {
475  		CONF_ITEM_UI16("Number", 0, UINT16_MAX, 0,
476  			       fsal_pnfs_ds, id_servers),
477  		CONF_RELAX_BLOCK("FSAL", fsal_params,
478  				 fsal_init, fsal_cfg_commit,
479  				 fsal_pnfs_ds, fsal),
480  		CONFIG_EOL
481  	};
482  	
483  	/**
484  	 * @brief Top level definition for each DS block
485  	 */
486  	
487  	static struct config_block pds_block = {
488  		.dbus_interface_name = "org.ganesha.nfsd.config.ds.%d",
489  		.blk_desc.name = "DS",
490  		.blk_desc.type = CONFIG_BLOCK,
491  		.blk_desc.u.blk.init = pds_init,
492  		.blk_desc.u.blk.params = pds_items,
493  		.blk_desc.u.blk.commit = pds_commit,
494  		.blk_desc.u.blk.display = pds_display
495  	};
496  	
497  	/**
498  	 * @brief Read the DS blocks from the parsed configuration file.
499  	 *
500  	 * @param[in]  in_config    The file that contains the DS list
501  	 *
502  	 * @return A negative value on error;
503  	 *         otherwise, the number of DS blocks.
504  	 */
505  	
506  	int ReadDataServers(config_file_t in_config,
507  			    struct config_error_type *err_type)
508  	{
509  		int rc;
510  	
511  		rc = load_config_from_parse(in_config,
512  					    &pds_block,
513  					    NULL,
514  					    false,
515  					    err_type);
516  		if (!config_error_is_harmless(err_type))
517  			return -1;
518  	
519  		return rc;
520  	}
521  	
522  	/**
523  	 * @brief Initialize server tree
524  	 */
525  	
526  	void server_pkginit(void)
527  	{
528  		pthread_rwlockattr_t rwlock_attr;
529  	
530  		pthread_rwlockattr_init(&rwlock_attr);
531  	#ifdef GLIBC
532  		pthread_rwlockattr_setkind_np(
533  			&rwlock_attr,
534  			PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
535  	#endif
536  		PTHREAD_RWLOCK_init(&server_by_id.lock, &rwlock_attr);
537  		avltree_init(&server_by_id.t, server_id_cmpf, 0);
538  		glist_init(&dslist);
539  		memset(&server_by_id.cache, 0, sizeof(server_by_id.cache));
540  		pthread_rwlockattr_destroy(&rwlock_attr);
541  	}
542