1    	/*
2    	 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Copyright CEA/DAM/DIF  (2008)
5    	 * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
6    	 *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
7    	 *
8    	 *
9    	 * This program is free software; you can redistribute it and/or
10   	 * modify it under the terms of the GNU Lesser General Public
11   	 * License as published by the Free Software Foundation; either
12   	 * version 3 of the License, or (at your option) any later version.
13   	 *
14   	 * This program is distributed in the hope that it will be useful,
15   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17   	 * Lesser General Public License for more details.
18   	 *
19   	 * You should have received a copy of the GNU Lesser General Public
20   	 * License along with this library; if not, write to the Free Software
21   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22   	 * 02110-1301 USA
23   	 *
24   	 * ---------------------------------------
25   	 */
26   	
27   	/**
28   	 * @addtogroup hashtable
29   	 * @{
30   	 */
31   	
32   	/**
33   	 * @file hashtable.c
34   	 * @brief Implement an RBTree-based partitioend hash lookup
35   	 *
36   	 * This file implements a partitioned, tree-based, concurrent
37   	 * hash-lookup structure.  For every key, two values are derived that
38   	 * determine its location within the structure: an index, which
39   	 * determines which of the partitions (each containing a tree and each
40   	 * separately locked), and a hash which acts as the key within an
41   	 * individual Red-Black Tree.
42   	 */
43   	
44   	#include "config.h"
45   	
46   	#include <stdio.h>
47   	#include <string.h>
48   	#include <pthread.h>
49   	#include "hashtable.h"
50   	#include "log.h"
51   	#include "abstract_atomic.h"
52   	#include "common_utils.h"
53   	#include <assert.h>
54   	
55   	/**
56   	 * @brief Total size of the cache page configured for a table
57   	 *
58   	 * This function returns the size of the cache page for the given hash
59   	 * table, based on the configured entry count.
60   	 *
61   	 * @param[in] ht The hash table to query
62   	 *
63   	 * @return The cache page size
64   	 */
65   	static inline size_t
66   	cache_page_size(const hash_table_t *ht)
67   	{
68   		return (ht->parameter.cache_entry_count) * sizeof(struct rbt_node *);
69   	}
70   	
71   	/**
72   	 * @brief Offset of a pointer in the cache
73   	 *
74   	 * This function returns the offset into a cache array of the given
75   	 * hash value.
76   	 *
77   	 * @param[in] ht      The hash table to query
78   	 * @param[in] rbthash The hash value to look up
79   	 *
80   	 * @return the offset into the cache at which the hash value might be
81   	 *         found
82   	 */
83   	static inline int
84   	cache_offsetof(struct hash_table *ht, uint64_t rbthash)
85   	{
86   		return rbthash % ht->parameter.cache_entry_count;
87   	}
88   	
89   	/**
90   	 * @brief Return an error string for an error code
91   	 *
92   	 * This function returns an error string corresponding to the supplied
93   	 * error code.
94   	 *
95   	 * @param[in] err The error code to look up
96   	 *
97   	 * @return An error string or "UNKNOWN HASH TABLE ERROR"
98   	 */
99   	const char *
100  	hash_table_err_to_str(hash_error_t err)
101  	{
102  		switch (err) {
103  		case HASHTABLE_SUCCESS:
104  			return "HASHTABLE_SUCCESS";
105  		case HASHTABLE_UNKNOWN_HASH_TYPE:
106  			return "HASHTABLE_UNKNOWN_HASH_TYPE";
107  		case HASHTABLE_ERROR_NO_SUCH_KEY:
108  			return "HASHTABLE_ERROR_NO_SUCH_KEY";
109  		case HASHTABLE_ERROR_KEY_ALREADY_EXISTS:
110  			return "HASHTABLE_ERROR_KEY_ALREADY_EXISTS";
111  		case HASHTABLE_ERROR_INVALID_ARGUMENT:
112  			return "HASHTABLE_ERROR_INVALID_ARGUMENT";
113  		case HASHTABLE_ERROR_DELALL_FAIL:
114  			return "HASHTABLE_ERROR_DELALL_FAIL";
115  		case HASHTABLE_NOT_DELETED:
116  			return "HASHTABLE_NOT_DELETED";
117  		case HASHTABLE_OVERWRITTEN:
118  			return "HASHTABLE_OVERWRITTEN";
119  		}
120  	
121  		return "UNKNOWN HASH TABLE ERROR";
122  	}
123  	
124  	/**
125  	 * @brief Locate a key within a partition
126  	 *
127  	 * This function traverses the red-black tree within a hash table
128  	 * partition and returns, if one exists, a pointer to a node matching
129  	 * the supplied key.
130  	 *
131  	 * @param[in]  ht      The hashtable to be used
132  	 * @param[in]  key     The key to look up
133  	 * @param[in]  index   Index into RBT array
134  	 * @param[in]  rbthash Hash in red-black tree
135  	 * @param[out] node    On success, the found node, NULL otherwise
136  	 *
137  	 * @retval HASHTABLE_SUCCESS if successfull
138  	 * @retval HASHTABLE_NO_SUCH_KEY if key was not found
139  	 */
140  	static hash_error_t
141  	key_locate(struct hash_table *ht, const struct gsh_buffdesc *key,
142  		   uint32_t index, uint64_t rbthash, struct rbt_node **node)
143  	{
144  		/* The current partition */
145  		struct hash_partition *partition = &(ht->partitions[index]);
146  	
147  		/* The root of the red black tree matching this index */
148  		struct rbt_head *root = NULL;
149  	
150  		/* A pair of buffer descriptors locating key and value for this
151  		   entry */
152  		struct hash_data *data = NULL;
153  	
154  		/* The node in the red-black tree currently being traversed */
155  		struct rbt_node *cursor = NULL;
156  	
157  		/* true if we have located the key */
158  		int found = false;
159  	
160  		*node = NULL;
161  	
162  		if (partition->cache) {
163  			void **cache_slot = (void **)
164  			    &(partition->cache[cache_offsetof(ht, rbthash)]);
165  			cursor = atomic_fetch_voidptr(cache_slot);
166  			LogFullDebug(COMPONENT_HASHTABLE_CACHE,
167  				     "hash %s index %" PRIu32 " slot %d",
168  				     (cursor) ? "hit" : "miss", index,
169  				     cache_offsetof(ht, rbthash));
170  			if (cursor) {
171  				data = RBT_OPAQ(cursor);
172  				if (ht->parameter.compare_key(
173  							(struct gsh_buffdesc *)key,
174  							&(data->key)) == 0) {
175  					goto out;
176  				}
177  			}
178  		}
179  	
180  		root = &(ht->partitions[index].rbt);
181  	
182  		/* The lefmost occurrence of the value is the one from which we
183  		   may start iteration to visit all nodes containing a value. */
184  		RBT_FIND_LEFT(root, cursor, rbthash);
185  	
186  		if (cursor == NULL) {
187  			if (isFullDebug(COMPONENT_HASHTABLE)
188  			    && isFullDebug(ht->parameter.ht_log_component))
189  				LogFullDebug(ht->parameter.ht_log_component,
190  					     "Key not found: rbthash = %" PRIu64,
191  					     rbthash);
192  			return HASHTABLE_ERROR_NO_SUCH_KEY;
193  		}
194  	
195  		while ((cursor != NULL) && (RBT_VALUE(cursor) == rbthash)) {
196  			data = RBT_OPAQ(cursor);
197  			if (ht->parameter.compare_key((struct gsh_buffdesc *)key,
198  						      &(data->key)) == 0) {
199  				if (partition->cache) {
200  					void **cache_slot = (void **)
201  					    &partition->cache[cache_offsetof(ht,
202  									     rbthash)];
203  					atomic_store_voidptr(cache_slot, cursor);
204  				}
205  				found = true;
206  				break;
207  			}
208  			RBT_INCREMENT(cursor);
209  		}
210  	
211  		if (!found) {
212  			if (isFullDebug(COMPONENT_HASHTABLE)
213  			    && isFullDebug(ht->parameter.ht_log_component))
214  				LogFullDebug(ht->parameter.ht_log_component,
215  					     "Matching hash found, but no matching key.");
216  			return HASHTABLE_ERROR_NO_SUCH_KEY;
217  		}
218  	
219  	 out:
220  		*node = cursor;
221  	
222  		return HASHTABLE_SUCCESS;
223  	}
224  	
225  	/**
226  	 * @brief Compute the values to search a hash store
227  	 *
228  	 * This function computes the index and RBT hash values for the
229  	 * specified key.
230  	 *
231  	 * @param[in]  ht       The hash table whose parameters determine computation
232  	 * @param[in]  key      The key from which to compute the values
233  	 * @param[out] index    The partition index
234  	 * @param[out] rbt_hash The hash in the Red-Black tree
235  	 *
236  	 * @retval HASHTABLE_SUCCESS if values computed
237  	 * @retval HASHTABLE_ERROR_INVALID_ARGUMENT if the supplied function
238  	 *         fails
239  	 */
240  	
241  	static inline hash_error_t
242  	compute(struct hash_table *ht, const struct gsh_buffdesc *key,
243  		uint32_t *index, uint64_t *rbt_hash)
244  	{
245  		/* Compute the partition index and red-black tree hash */
246  		if (ht->parameter.hash_func_both) {
247  			if (!(*(ht->parameter.hash_func_both))
248  			    (&ht->parameter, (struct gsh_buffdesc *)key, index,
249  			     rbt_hash))
250  				return HASHTABLE_ERROR_INVALID_ARGUMENT;
251  		} else {
252  			*index =
253  			    (*(ht->parameter.hash_func_key)) (&ht->parameter,
254  							      (struct gsh_buffdesc *)
255  							      key);
256  			*rbt_hash =
257  			    (*(ht->parameter.hash_func_rbt)) (&ht->parameter,
258  							      (struct gsh_buffdesc *)
259  							      key);
260  		}
261  	
262  		/* At the suggestion of Jim Lieb, die if a hash function sends
263  		   us off the end of the array. */
264  	
265  		assert(*index < ht->parameter.index_size);
266  	
267  		return HASHTABLE_SUCCESS;
268  	}
269  	
270  	/* The following are the hash table primitives implementing the
271  	   actual functionality. */
272  	
273  	/**
274  	 * @brief Initialize a new hash table
275  	 *
276  	 * This function initializes and allocates storage for a hash table.
277  	 *
278  	 * @param[in] hparam Parameters to determine the hash table's
279  	 *                    behaviour
280  	 *
281  	 * @return Pointer to the new hash table, NULL on failure
282  	 *
283  	 */
284  	
285  	struct hash_table *
286  	hashtable_init(struct hash_param *hparam)
287  	{
288  		/* The hash table being constructed */
289  		struct hash_table *ht = NULL;
290  		/* The index for initializing each partition */
291  		uint32_t index = 0;
292  		/* Read-Write Lock attributes, to prevent write starvation under
293  		   GLIBC */
294  		pthread_rwlockattr_t rwlockattr;
295  		/* Hash partition */
296  		struct hash_partition *partition = NULL;
297  		/* The number of fully initialized partitions */
298  		uint32_t completed = 0;
299  	
300  		if (pthread_rwlockattr_init(&rwlockattr) != 0)
301  			return NULL;
302  	
303  		/* At some point factor this out into the OS directory.  it is
304  		   necessary to prevent writer starvation under GLIBC. */
305  	#ifdef GLIBC
306  		if ((pthread_rwlockattr_setkind_np
307  		     (&rwlockattrs,
308  		      PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)) != 0) {
309  			LogCrit(COMPONENT_HASHTABLE,
310  				"Unable to set writer-preference on lock attribute.");
311  			goto deconstruct;
312  		}
313  	#endif				/* GLIBC */
314  	
315  		ht = gsh_calloc(1, sizeof(struct hash_table) +
316  				(sizeof(struct hash_partition) *
317  				 hparam->index_size));
318  	
319  		/* Fixup entry size */
320  		if (hparam->flags & HT_FLAG_CACHE) {
321  			if (!hparam->cache_entry_count)
322  				/* works fine with a good hash algo */
323  				hparam->cache_entry_count = 32767;
324  		}
325  	
326  		/* We need to save copy of the parameters in the table. */
327  		ht->parameter = *hparam;
328  		for (index = 0; index < hparam->index_size; ++index) {
329  			partition = (&ht->partitions[index]);
330  			RBT_HEAD_INIT(&(partition->rbt));
331  	
332  			if (pthread_rwlock_init(&partition->lock, &rwlockattr) != 0) {
333  				LogCrit(COMPONENT_HASHTABLE,
334  					"Unable to initialize lock in hash table.");
335  				goto deconstruct;
336  			}
337  	
338  			/* Allocate a cache if requested */
339  			if (hparam->flags & HT_FLAG_CACHE)
340  				partition->cache = gsh_calloc(1, cache_page_size(ht));
341  	
342  			completed++;
343  		}
344  	
345  		ht->node_pool = pool_basic_init(NULL, sizeof(rbt_node_t));
346  		ht->data_pool = pool_basic_init(NULL, sizeof(struct hash_data));
347  	
348  		pthread_rwlockattr_destroy(&rwlockattr);
349  		return ht;
350  	
351  	 deconstruct:
352  	
353  		while (completed != 0) {
354  			if (hparam->flags & HT_FLAG_CACHE)
355  				gsh_free(ht->partitions[completed - 1].cache);
356  	
357  			PTHREAD_RWLOCK_destroy(&(ht->partitions[completed - 1].lock));
358  			completed--;
359  		}
360  		if (ht->node_pool)
361  			pool_destroy(ht->node_pool);
362  		if (ht->data_pool)
363  			pool_destroy(ht->data_pool);
364  	
365  		gsh_free(ht);
366  		return ht = NULL;
367  	}
368  	
369  	/**
370  	 * @brief Dispose of a hash table
371  	 *
372  	 * This function deletes all the entries from the given hash table and
373  	 * then destroys the hash table.
374  	 *
375  	 * @param[in,out] ht        Pointer to the hash table.  After calling
376  	 *                          this function, the memory pointed to by ht
377  	 *                          must not be accessed in any way.
378  	 * @param[in]     free_func Function to free entries as they are
379  	 *                          deleted
380  	 *
381  	 * @return HASHTABLE_SUCCESS on success, other things on failure
382  	 */
383  	hash_error_t
384  	hashtable_destroy(struct hash_table *ht,
385  			  int (*free_func)(struct gsh_buffdesc,
386  					   struct gsh_buffdesc))
387  	{
388  		size_t index = 0;
389  		hash_error_t hrc = HASHTABLE_SUCCESS;
390  	
391  		hrc = hashtable_delall(ht, free_func);
392  		if (hrc != HASHTABLE_SUCCESS)
393  			goto out;
394  	
395  		for (index = 0; index < ht->parameter.index_size; ++index) {
396  			if (ht->partitions[index].cache) {
397  				gsh_free(ht->partitions[index].cache);
398  				ht->partitions[index].cache = NULL;
399  			}
400  	
401  			PTHREAD_RWLOCK_destroy(&(ht->partitions[index].lock));
402  		}
403  		pool_destroy(ht->node_pool);
404  		pool_destroy(ht->data_pool);
405  		gsh_free(ht);
406  	
407  	 out:
408  		return hrc;
409  	}
410  	
411  	/**
412  	 * @brief acquire the partition lock for the given key
413  	 *
414  	 * This function acquires the partition write mode lock corresponding to
415  	 * the given key.
416  	 *
417  	 * @brief[in]  ht        The hash table to search
418  	 * @brief[in]  key       The key for which to acquire the partition lock
419  	 * @brief[out] latch     Opaque structure holding partition information
420  	 *
421  	 * @retval HASHTABLE_SUCCESS, the partition lock is acquired.
422  	 * @retval Others, failure, the partition lock is not acquired
423  	 *
424  	 * NOTES: fast path if the caller just needs to lock the partition and
425  	 * doesn't need to look for the entry. The lock needs to be released
426  	 * with hashtable_releaselatched()
427  	 */
428  	hash_error_t
429  	hashtable_acquire_latch(struct hash_table *ht,
430  				const struct gsh_buffdesc *key,
431  				struct hash_latch *latch)
432  	{
433  		uint32_t index;
434  		uint64_t rbt_hash;
435  		hash_error_t rc = HASHTABLE_SUCCESS;
436  	
437  		memset(latch, 0, sizeof(struct hash_latch));
438  		rc = compute(ht, key, &index, &rbt_hash);
439  		if (rc != HASHTABLE_SUCCESS)
440  			return rc;
441  	
442  		latch->index = index;
443  		PTHREAD_RWLOCK_wrlock(&(ht->partitions[index].lock));
444  	
445  		return HASHTABLE_SUCCESS;
446  	}
447  	
448  	/**
449  	 * @brief Look up an entry, latching the table
450  	 *
451  	 * This function looks up an entry in the hash table and latches the
452  	 * partition in which that entry would belong in preparation for other
453  	 * activities.  This function is a primitive and is intended more for
454  	 * use building other access functions than for client code itself.
455  	 *
456  	 * @brief[in]  ht        The hash table to search
457  	 * @brief[in]  key       The key for which to search
458  	 * @brief[out] val       The value found
459  	 * @brief[in]  may_write This must be true if the followup call might
460  	 *                       mutate the hash table (set or delete)
461  	 * @brief[out] latch     Opaque structure holding information on the
462  	 *                       table.
463  	 *
464  	 * @retval HASHTABLE_SUCCESS The entry was found, the table is
465  	 *         latched.
466  	 * @retval HASHTABLE_ERROR_NOT_FOUND The entry was not found, the
467  	 *         table is latched.
468  	 * @retval Others, failure, the table is not latched.
469  	 */
470  	hash_error_t
471  	hashtable_getlatch(struct hash_table *ht,
472  			   const struct gsh_buffdesc *key,
473  			   struct gsh_buffdesc *val, bool may_write,
474  			   struct hash_latch *latch)
475  	{
476  		/* The index specifying the partition to search */
477  		uint32_t index = 0;
478  		/* The node found for the key */
479  		struct rbt_node *locator = NULL;
480  		/* The buffer descritpros for the key and value for the found entry */
481  		struct hash_data *data = NULL;
482  		/* The hash value to be searched for within the Red-Black tree */
483  		uint64_t rbt_hash = 0;
484  		/* Stored error return */
485  		hash_error_t rc = HASHTABLE_SUCCESS;
486  	
487  		/* This combination of options makes no sense ever */
(1) Event cond_true: Condition "may_write", taking true branch.
(2) Event cond_false: Condition "!latch", taking false branch.
(3) Event if_fallthrough: Falling through to end of if statement.
(4) Event if_end: End of if statement.
488  		assert(!(may_write && !latch));
489  	
490  		rc = compute(ht, key, &index, &rbt_hash);
(5) Event cond_false: Condition "rc != HASHTABLE_SUCCESS", taking false branch.
491  		if (rc != HASHTABLE_SUCCESS)
(6) Event if_end: End of if statement.
492  			return rc;
493  	
494  		/* Acquire mutex */
(7) Event cond_true: Condition "may_write", taking true branch.
495  		if (may_write)
(8) Event lock: "pthread_rwlock_wrlock" locks "ht->partitions[index].lock".
(9) Event cond_true: Condition "rc == 0", taking true branch.
(10) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(11) Event cond_true: Condition "!!(component_log_level[COMPONENT_RW_LOCK] >= NIV_FULL_DEBUG)", taking true branch.
(12) Event if_fallthrough: Falling through to end of if statement.
(13) Event if_end: End of if statement.
(14) Event if_fallthrough: Falling through to end of if statement.
Also see events: [missing_unlock]
496  			PTHREAD_RWLOCK_wrlock(&(ht->partitions[index].lock));
497  		else
(15) Event if_end: End of if statement.
498  			PTHREAD_RWLOCK_rdlock(&(ht->partitions[index].lock));
499  	
500  		rc = key_locate(ht, key, index, rbt_hash, &locator);
501  	
(16) Event cond_true: Condition "rc == HASHTABLE_SUCCESS", taking true branch.
502  		if (rc == HASHTABLE_SUCCESS) {
503  			/* Key was found */
504  			data = RBT_OPAQ(locator);
(17) Event cond_true: Condition "val", taking true branch.
505  			if (val) {
506  				val->addr = data->val.addr;
507  				val->len = data->val.len;
508  			}
509  	
(18) Event cond_true: Condition "!!(component_log_level[COMPONENT_HASHTABLE] >= NIV_DEBUG)", taking true branch.
(19) Event cond_true: Condition "!!(component_log_level[COMPONENT_HASHTABLE] >= NIV_DEBUG)", taking true branch.
(20) Event cond_true: Condition "!!(component_log_level[ht->parameter.ht_log_component] >= NIV_FULL_DEBUG)", taking true branch.
(21) Event cond_true: Condition "!!(component_log_level[ht->parameter.ht_log_component] >= NIV_FULL_DEBUG)", taking true branch.
510  			if (isDebug(COMPONENT_HASHTABLE)
511  			    && isFullDebug(ht->parameter.ht_log_component)) {
512  				char dispval[HASHTABLE_DISPLAY_STRLEN];
513  	
(22) Event cond_true: Condition "ht->parameter.val_to_str != NULL", taking true branch.
514  				if (ht->parameter.val_to_str != NULL)
(23) Event if_fallthrough: Falling through to end of if statement.
515  					ht->parameter.val_to_str(&data->val, dispval);
516  				else
(24) Event if_end: End of if statement.
517  					dispval[0] = '\0';
518  	
(25) Event cond_true: Condition "!!(component_log_level[ht->parameter.ht_log_component] >= NIV_FULL_DEBUG)", taking true branch.
(26) Event cond_true: Condition "!!(component_log_level[ht->parameter.ht_log_component] >= NIV_FULL_DEBUG)", taking true branch.
519  				LogFullDebug(ht->parameter.ht_log_component,
520  					     "Get %s returning Value=%p {%s}",
521  					     ht->parameter.ht_name, data->val.addr,
522  					     dispval);
523  			}
524  		}
525  	
(27) Event cond_true: Condition "rc == HASHTABLE_SUCCESS", taking true branch.
(28) Event cond_true: Condition "latch != NULL", taking true branch.
526  		if (((rc == HASHTABLE_SUCCESS) || (rc == HASHTABLE_ERROR_NO_SUCH_KEY))
527  		    && (latch != NULL)) {
528  			latch->index = index;
529  			latch->rbt_hash = rbt_hash;
530  			latch->locator = locator;
(29) Event if_fallthrough: Falling through to end of if statement.
531  		} else {
532  			PTHREAD_RWLOCK_unlock(&ht->partitions[index].lock);
(30) Event if_end: End of if statement.
533  		}
534  	
(31) Event cond_false: Condition "rc != HASHTABLE_SUCCESS", taking false branch.
535  		if (rc != HASHTABLE_SUCCESS && isDebug(COMPONENT_HASHTABLE)
536  		    && isFullDebug(ht->parameter.ht_log_component))
(32) Event if_end: End of if statement.
537  			LogFullDebug(ht->parameter.ht_log_component,
538  				     "Get %s returning failure %s",
539  				     ht->parameter.ht_name, hash_table_err_to_str(rc));
540  	
(33) Event missing_unlock: Returning without unlocking "ht->partitions[index].lock".
Also see events: [lock]
541  		return rc;
542  	}
543  	
544  	/**
545  	 *
546  	 * @brief Release lock held on hash table
547  	 *
548  	 * This function releases the lock on the hash partition acquired and
549  	 * retained by a call to hashtable_getlatch.  This function must be
550  	 * used to free any acquired lock but ONLY if the lock was not already
551  	 * freed by some other means (hashtable_setlatched or
552  	 * HashTable_DelLatched).
553  	 *
554  	 * @param[in] ht    The hash table with the lock to be released
555  	 * @param[in] latch The latch structure holding retained state
556  	 */
557  	
558  	void
559  	hashtable_releaselatched(struct hash_table *ht, struct hash_latch *latch)
560  	{
561  		if (latch) {
562  			PTHREAD_RWLOCK_unlock(&ht->partitions[latch->index].lock);
563  			memset(latch, 0, sizeof(struct hash_latch));
564  		}
565  	}
566  	
567  	/**
568  	 * @brief Set a value in a table following a previous GetLatch
569  	 *
570  	 * This function sets a value in a hash table following a previous
571  	 * call to the hashtable_getlatch function.  It must only be used
572  	 * after such a call made with the may_write parameter set to true.
573  	 * In all cases, the lock on the hash table is released.
574  	 *
575  	 * @param[in,out] ht          The hash store to be modified
576  	 * @param[in]     key         A buffer descriptor locating the key to set
577  	 * @param[in]     val         A buffer descriptor locating the value to insert
578  	 * @param[in]     latch       A pointer to a structure filled by a previous
579  	 *                            call to hashtable_getlatched.
580  	 * @param[in]     overwrite   If true, overwrite a prexisting key,
581  	 *                            otherwise return error on collision.
582  	 * @param[out]    stored_key If non-NULL, a buffer descriptor for an
583  	 *                           overwritten key as stored.
584  	 * @param[out]    stored_val If non-NULL, a buffer descriptor for an
585  	 *                           overwritten value as stored.
586  	 *
587  	 * @retval HASHTABLE_SUCCESS on non-colliding insert
588  	 * @retval HASHTABLE_ERROR_KEY_ALREADY_EXISTS if overwrite disabled
589  	 * @retval HASHTABLE_OVERWRITTEN on successful overwrite
590  	 * @retval Other errors on failure
591  	 */
592  	
593  	hash_error_t
594  	hashtable_setlatched(struct hash_table *ht,
595  			     struct gsh_buffdesc *key,
596  			     struct gsh_buffdesc *val,
597  			     struct hash_latch *latch, int overwrite,
598  			     struct gsh_buffdesc *stored_key,
599  			     struct gsh_buffdesc *stored_val)
600  	{
601  		/* Stored error return */
602  		hash_error_t rc = HASHTABLE_SUCCESS;
603  		/* The pair of buffer descriptors locating both key and value
604  		   for this object, what actually gets stored. */
605  		struct hash_data *descriptors = NULL;
606  		/* Node giving the location to insert new node */
607  		struct rbt_node *locator = NULL;
608  		/* New node for the case of non-overwrite */
609  		struct rbt_node *mutator = NULL;
610  	
611  		if (isDebug(COMPONENT_HASHTABLE)
612  		    && isFullDebug(ht->parameter.ht_log_component)) {
613  			char dispkey[HASHTABLE_DISPLAY_STRLEN];
614  			char dispval[HASHTABLE_DISPLAY_STRLEN];
615  	
616  			if (ht->parameter.key_to_str != NULL)
617  				ht->parameter.key_to_str(key, dispkey);
618  			else
619  				dispkey[0] = '\0';
620  	
621  			if (ht->parameter.val_to_str != NULL)
622  				ht->parameter.val_to_str(val, dispval);
623  			else
624  				dispval[0] = '\0';
625  	
626  			LogFullDebug(ht->parameter.ht_log_component,
627  				     "Set %s Key=%p {%s} Value=%p {%s} index=%" PRIu32
628  				     " rbt_hash=%" PRIu64, ht->parameter.ht_name,
629  				     key->addr, dispkey, val->addr, dispval,
630  				     latch->index, latch->rbt_hash);
631  		}
632  	
633  		/* In the case of collision */
634  		if (latch->locator) {
635  			if (!overwrite) {
636  				rc = HASHTABLE_ERROR_KEY_ALREADY_EXISTS;
637  				goto out;
638  			}
639  	
640  			descriptors = RBT_OPAQ(latch->locator);
641  	
642  			if (isDebug(COMPONENT_HASHTABLE)
643  			    && isFullDebug(ht->parameter.ht_log_component)) {
644  				char dispkey[HASHTABLE_DISPLAY_STRLEN];
645  				char dispval[HASHTABLE_DISPLAY_STRLEN];
646  	
647  				if (ht->parameter.key_to_str != NULL)
648  					ht->parameter.key_to_str(&descriptors->key,
649  								 dispkey);
650  				else
651  					dispkey[0] = '\0';
652  	
653  				if (ht->parameter.val_to_str != NULL)
654  					ht->parameter.val_to_str(&descriptors->val,
655  								 dispval);
656  				else
657  					dispval[0] = '\0';
658  	
659  				LogFullDebug(ht->parameter.ht_log_component,
660  					     "Set %s Key=%p {%s} Value=%p {%s} index=%"
661  					     PRIu32" rbt_hash=%"PRIu64" was replaced",
662  					     ht->parameter.ht_name,
663  					     descriptors->key.addr, dispkey,
664  					     descriptors->val.addr, dispval,
665  					     latch->index, latch->rbt_hash);
666  			}
667  	
668  			if (stored_key)
669  				*stored_key = descriptors->key;
670  	
671  			if (stored_val)
672  				*stored_val = descriptors->val;
673  	
674  			descriptors->key = *key;
675  			descriptors->val = *val;
676  			rc = HASHTABLE_OVERWRITTEN;
677  			goto out;
678  		}
679  	
680  		/* We have no collision, so go about creating and inserting a new
681  		   node. */
682  	
683  		RBT_FIND(&ht->partitions[latch->index].rbt, locator, latch->rbt_hash);
684  	
685  		mutator = pool_alloc(ht->node_pool);
686  	
687  		descriptors = pool_alloc(ht->data_pool);
688  	
689  		RBT_OPAQ(mutator) = descriptors;
690  		RBT_VALUE(mutator) = latch->rbt_hash;
691  		RBT_INSERT(&ht->partitions[latch->index].rbt, mutator, locator);
692  	
693  		descriptors->key.addr = key->addr;
694  		descriptors->key.len = key->len;
695  	
696  		descriptors->val.addr = val->addr;
697  		descriptors->val.len = val->len;
698  	
699  		/* Only in the non-overwrite case */
700  		++ht->partitions[latch->index].count;
701  	
702  		rc = HASHTABLE_SUCCESS;
703  	
704  	 out:
705  		hashtable_releaselatched(ht, latch);
706  	
707  		if (rc != HASHTABLE_SUCCESS && isDebug(COMPONENT_HASHTABLE)
708  		    && isFullDebug(ht->parameter.ht_log_component))
709  			LogFullDebug(ht->parameter.ht_log_component,
710  				     "Set %s returning failure %s",
711  				     ht->parameter.ht_name, hash_table_err_to_str(rc));
712  	
713  		return rc;
714  	}
715  	
716  	/**
717  	 * @brief Delete a value from the store following a previous GetLatch
718  	 *
719  	 * This function removes a value from the a hash store, the value
720  	 * already having been looked up with GetLatched. In all cases, the
721  	 * lock is retained. hashtable_getlatch must have been called with
722  	 * may_write true.
723  	 *
724  	 * @param[in,out] ht      The hash store to be modified
725  	 * @param[in]     key     A buffer descriptore locating the key to remove
726  	 * @param[in]     latch   A pointer to a structure filled by a previous
727  	 *                        call to hashtable_getlatched.
728  	 * @param[out] stored_key If non-NULL, a buffer descriptor the
729  	 *                        removed key as stored.
730  	 * @param[out] stored_val If non-NULL, a buffer descriptor for the
731  	 *                        removed value as stored.
732  	 *
733  	 */
734  	
735  	void hashtable_deletelatched(struct hash_table *ht,
736  				     const struct gsh_buffdesc *key,
737  				     struct hash_latch *latch,
738  				     struct gsh_buffdesc *stored_key,
739  				     struct gsh_buffdesc *stored_val)
740  	{
741  		/* The pair of buffer descriptors comprising the stored entry */
742  		struct hash_data *data;
743  		/* Its partition */
744  		struct hash_partition *partition = &ht->partitions[latch->index];
745  	
746  		data = RBT_OPAQ(latch->locator);
747  	
748  		if (isDebug(COMPONENT_HASHTABLE)
749  		    && isFullDebug(ht->parameter.ht_log_component)) {
750  			char dispkey[HASHTABLE_DISPLAY_STRLEN];
751  			char dispval[HASHTABLE_DISPLAY_STRLEN];
752  	
753  			if (ht->parameter.key_to_str != NULL)
754  				ht->parameter.key_to_str(&data->key, dispkey);
755  			else
756  				dispkey[0] = '\0';
757  	
758  			if (ht->parameter.val_to_str != NULL)
759  				ht->parameter.val_to_str(&data->val, dispval);
760  			else
761  				dispval[0] = '\0';
762  	
763  			LogFullDebug(ht->parameter.ht_log_component,
764  				     "Delete %s Key=%p {%s} Value=%p {%s} index=%"
765  				     PRIu32 " rbt_hash=%" PRIu64 " was removed",
766  				     ht->parameter.ht_name, data->key.addr, dispkey,
767  				     data->val.addr, dispval, latch->index,
768  				     latch->rbt_hash);
769  		}
770  	
771  		if (stored_key)
772  			*stored_key = data->key;
773  	
774  		if (stored_val)
775  			*stored_val = data->val;
776  	
777  		/* Clear cache */
778  		if (partition->cache) {
779  			uint32_t offset = cache_offsetof(ht, latch->rbt_hash);
780  			struct rbt_node *cnode = partition->cache[offset];
781  	
782  			if (cnode) {
783  	#if COMPARE_BEFORE_CLEAR_CACHE
784  				struct hash_data *data1 = RBT_OPAQ(cnode);
785  				struct hash_data *data2 = RBT_OPAQ(latch->locator);
786  	
787  				if (ht->parameter.compare_key(&data1->key, &data2->key)
788  				    == 0) {
789  					LogFullDebug(COMPONENT_HASHTABLE_CACHE,
790  						     "hash clear index %d slot %" PRIu64
791  						     latch->index, offset);
792  					partition->cache[offset] = NULL;
793  				}
794  	#else
795  				LogFullDebug(COMPONENT_HASHTABLE_CACHE,
796  					     "hash clear slot %d", offset);
797  				partition->cache[offset] = NULL;
798  	#endif
799  			}
800  		}
801  	
802  		/* Now remove the entry */
803  		RBT_UNLINK(&partition->rbt, latch->locator);
804  		pool_free(ht->data_pool, data);
805  		pool_free(ht->node_pool, latch->locator);
806  		--ht->partitions[latch->index].count;
807  	
808  		/* Some callers re-use the latch to insert a record after this call,
809  		 * so reset latch locator to avoid hashtable_setlatched() using the
810  		 * invalid latch->locator
811  		 */
812  		latch->locator = NULL;
813  	}
814  	
815  	/**
816  	 * @brief Remove and free all (key,val) couples from the hash store
817  	 *
818  	 * This function removes all (key,val) couples from the hashtable and
819  	 * frees the stored data using the supplied function
820  	 *
821  	 * @param[in,out] ht        The hashtable to be cleared of all entries
822  	 * @param[in]     free_func The function with which to free the contents
823  	 *                          of each entry
824  	 *
825  	 * @return HASHTABLE_SUCCESS or errors
826  	 */
827  	hash_error_t
828  	hashtable_delall(struct hash_table *ht,
829  			 int (*free_func)(struct gsh_buffdesc,
830  					  struct gsh_buffdesc))
831  	{
832  		/* Successive partition numbers */
833  		uint32_t index = 0;
834  	
835  		for (index = 0; index < ht->parameter.index_size; index++) {
836  			/* The root of each successive partition */
837  			struct rbt_head *root = &ht->partitions[index].rbt;
838  			/* Pointer to node in tree for removal */
839  			struct rbt_node *cursor = NULL;
840  	
841  			PTHREAD_RWLOCK_wrlock(&ht->partitions[index].lock);
842  	
843  			/* Continue until there are no more entries in the red-black
844  			   tree */
845  			while ((cursor = RBT_LEFTMOST(root)) != NULL) {
846  				/* Pointer to the key and value descriptors
847  				   for each successive entry */
848  				struct hash_data *data = NULL;
849  				/* Aliased poitner to node, for freeing
850  				   buffers after removal from tree */
851  				struct rbt_node *holder = cursor;
852  				/* Buffer descriptor for key, as stored */
853  				struct gsh_buffdesc key;
854  				/* Buffer descriptor for value, as stored */
855  				struct gsh_buffdesc val;
856  				/* Return code from the free function.  Zero
857  				   on failure */
858  				int rc = 0;
859  	
860  				RBT_UNLINK(root, cursor);
861  				data = RBT_OPAQ(holder);
862  	
863  				key = data->key;
864  				val = data->val;
865  	
866  				pool_free(ht->data_pool, data);
867  				pool_free(ht->node_pool, holder);
868  				--ht->partitions[index].count;
869  				rc = free_func(key, val);
870  	
871  				if (rc == 0) {
872  					PTHREAD_RWLOCK_unlock(
873  							&ht->partitions[index].lock);
874  					return HASHTABLE_ERROR_DELALL_FAIL;
875  				}
876  			}
877  			PTHREAD_RWLOCK_unlock(&ht->partitions[index].lock);
878  		}
879  	
880  		return HASHTABLE_SUCCESS;
881  	}
882  	
883  	/**
884  	 * @brief Log information about the hashtable
885  	 *
886  	 * This debugging function prints information about the hash table to
887  	 * the log.
888  	 *
889  	 * @param[in] component The component debugging config to use.
890  	 * @param[in] ht        The hashtable to be used.
891  	 */
892  	
893  	void
894  	hashtable_log(log_components_t component, struct hash_table *ht)
895  	{
896  		/* The current position in the hash table */
897  		struct rbt_node *it = NULL;
898  		/* The root of the tree currently being inspected */
899  		struct rbt_head *root;
900  		/* Buffer descriptors for the key and value */
901  		struct hash_data *data = NULL;
902  		/* String representation of the key */
903  		char dispkey[HASHTABLE_DISPLAY_STRLEN];
904  		/* String representation of the stored value */
905  		char dispval[HASHTABLE_DISPLAY_STRLEN];
906  		/* Index for traversing the partitions */
907  		uint32_t i = 0;
908  		/* Running count of entries  */
909  		size_t nb_entries = 0;
910  		/* Recomputed partitionindex */
911  		uint32_t index = 0;
912  		/* Recomputed hash for Red-Black tree */
913  		uint64_t rbt_hash = 0;
914  	
915  		LogFullDebug(component, "The hash is partitioned into %d trees",
916  			     ht->parameter.index_size);
917  	
918  		for (i = 0; i < ht->parameter.index_size; i++)
919  			nb_entries += ht->partitions[i].count;
920  	
921  		LogFullDebug(component, "The hash contains %zd entries", nb_entries);
922  	
923  		for (i = 0; i < ht->parameter.index_size; i++) {
924  			root = &ht->partitions[i].rbt;
925  			LogFullDebug(component,
926  				     "The partition in position %" PRIu32
927  				     "contains: %u entries", i, root->rbt_num_node);
928  			PTHREAD_RWLOCK_rdlock(&ht->partitions[i].lock);
929  			RBT_LOOP(root, it) {
930  				data = it->rbt_opaq;
931  	
932  				ht->parameter.key_to_str(&(data->key), dispkey);
933  				ht->parameter.val_to_str(&(data->val), dispval);
934  	
935  				if (compute(ht, &data->key, &index, &rbt_hash)
936  				    != HASHTABLE_SUCCESS) {
937  					LogCrit(component,
938  						"Possible implementation error in hash_func_both");
939  					index = 0;
940  					rbt_hash = 0;
941  				}
942  	
943  				LogFullDebug(component,
944  					     "%s => %s; index=%" PRIu32 " rbt_hash=%"
945  					     PRIu64, dispkey, dispval, index, rbt_hash);
946  				RBT_INCREMENT(it);
947  			}
948  			PTHREAD_RWLOCK_unlock(&ht->partitions[i].lock);
949  		}
950  	}
951  	
952  	/**
953  	 * @brief Set a pair (key,value) into the Hash Table
954  	 *
955  	 * Depending on the value of 'how', this function sets a value into
956  	 * the hash table or tests that the hash table contains that value.
957  	 *
958  	 * This function is deprecated.
959  	 *
960  	 * @param[in,out] ht  The hashtable to test or alter
961  	 * @param[in]     key The key to be set
962  	 * @param[in]     val The value to be stored
963  	 * @param[in]     how A value determining whether this is a test, a set
964  	 *                    with overwrite, or a set without overwrite.
965  	 *
966  	 * @retval HASHTABLE_SUCCESS if successfull.
967  	 */
968  	
969  	hash_error_t
970  	hashtable_test_and_set(struct hash_table *ht,
971  			       struct gsh_buffdesc *key,
972  			       struct gsh_buffdesc *val,
973  			       hash_set_how_t how)
974  	{
975  		/* structure to hold retained state */
976  		struct hash_latch latch;
977  		/* Stored return code */
978  		hash_error_t rc = 0;
979  	
980  		rc = hashtable_getlatch(ht, key, NULL,
981  					(how != HASHTABLE_SET_HOW_TEST_ONLY), &latch);
982  	
983  		if ((rc != HASHTABLE_SUCCESS) &&
984  		    (rc != HASHTABLE_ERROR_NO_SUCH_KEY))
985  			return rc;
986  	
987  		if (how == HASHTABLE_SET_HOW_TEST_ONLY) {
988  			hashtable_releaselatched(ht, &latch);
989  			return rc;
990  		}
991  	
992  		/* No point in calling hashtable_setlatched when we know it
993  		   will error. */
994  	
995  		if ((how == HASHTABLE_SET_HOW_SET_NO_OVERWRITE)
996  		    && (rc == HASHTABLE_SUCCESS)) {
997  			hashtable_releaselatched(ht, &latch);
998  			return HASHTABLE_ERROR_KEY_ALREADY_EXISTS;
999  		}
1000 	
1001 		rc = hashtable_setlatched(ht, key, val, &latch,
1002 					  (how == HASHTABLE_SET_HOW_SET_OVERWRITE),
1003 					  NULL, NULL);
1004 	
1005 		if (rc == HASHTABLE_OVERWRITTEN)
1006 			rc = HASHTABLE_SUCCESS;
1007 	
1008 		return rc;
1009 	}
1010 	
1011 	/**
1012 	 * @brief Look up a value and take a reference
1013 	 *
1014 	 * This function attempts to locate a key in the hash store and return
1015 	 * the associated value.  It also calls the supplied function to take
1016 	 * a reference before releasing the partition lock.  It is implemented
1017 	 * as a wrapper around hashtable_getlatched.
1018 	 *
1019 	 * @param[in]  ht      The hash store to be searched
1020 	 * @param[in]  key     A buffer descriptore locating the key to find
1021 	 * @param[out] val     A buffer descriptor locating the value found
1022 	 * @param[in]  get_ref A function to take a reference on the supplied
1023 	 *                     value
1024 	 *
1025 	 * @return HASHTABLE_SUCCESS or errors
1026 	 */
1027 	hash_error_t
1028 	hashtable_getref(hash_table_t *ht, struct gsh_buffdesc *key,
1029 			 struct gsh_buffdesc *val,
1030 			 void (*get_ref)(struct gsh_buffdesc *))
1031 	{
1032 		/* structure to hold retained state */
1033 		struct hash_latch latch;
1034 		/* Stored return code */
1035 		hash_error_t rc = 0;
1036 	
1037 		rc = hashtable_getlatch(ht, key, val, false, &latch);
1038 	
1039 		switch (rc) {
1040 		case HASHTABLE_SUCCESS:
1041 			if (get_ref != NULL)
1042 				get_ref(val);
1043 			/* FALLTHROUGH */
1044 		case HASHTABLE_ERROR_NO_SUCH_KEY:
1045 			hashtable_releaselatched(ht, &latch);
1046 			break;
1047 	
1048 		default:
1049 			break;
1050 		}
1051 	
1052 		return rc;
1053 	}
1054 	
1055 	void hashtable_for_each(hash_table_t *ht, ht_for_each_cb_t callback, void *arg)
1056 	{
1057 		uint32_t i;
1058 		struct rbt_head *head_rbt;
1059 		struct rbt_node *pn;
1060 	
1061 		/* For each bucket of the requested hashtable */
1062 		for (i = 0; i < ht->parameter.index_size; i++) {
1063 			head_rbt = &ht->partitions[i].rbt;
1064 			PTHREAD_RWLOCK_rdlock(&ht->partitions[i].lock);
1065 			RBT_LOOP(head_rbt, pn) {
1066 				callback(pn, arg);
1067 				RBT_INCREMENT(pn);
1068 			}
1069 			PTHREAD_RWLOCK_unlock(&ht->partitions[i].lock);
1070 		}
1071 	}
1072 	/** @} */
1073