1    	/*
2    	 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Portions Copyright CEA/DAM/DIF  (2008)
5    	 * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
6    	 *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
7    	 *
8    	 * Portions Copyright (C) 2012, The Linux Box Corporation
9    	 * Contributor : Matt Benjamin <matt@linuxbox.com>
10   	 *
11   	 * This program is free software; you can redistribute it and/or
12   	 * modify it under the terms of the GNU Lesser General Public License
13   	 * as published by the Free Software Foundation; either version 3 of
14   	 * the License, or (at your option) any later version.
15   	 *
16   	 * This program is distributed in the hope that it will be useful, but
17   	 * WITHOUT ANY WARRANTY; without even the implied warranty of
18   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19   	 * Lesser General Public License for more details.
20   	 *
21   	 * You should have received a copy of the GNU Lesser General Public
22   	 * License along with this library; if not, write to the Free Software
23   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24   	 * 02110-1301 USA
25   	 *
26   	 * -------------
27   	 */
28   	
29   	/**
30   	 * @file nfs_dupreq.c
31   	 * @author Matt Benjamin <matt@linuxbox.com>
32   	 * @brief NFS Duplicate Request Cache
33   	 */
34   	
35   	#include "config.h"
36   	#include <unistd.h>
37   	#include <sys/types.h>
38   	#include <sys/param.h>
39   	#include <time.h>
40   	#include <pthread.h>
41   	#include <assert.h>
42   	
43   	/* XXX prune: */
44   	#include "log.h"
45   	#include "nfs_proto_functions.h"
46   	
47   	#include "nfs_dupreq.h"
48   	#include "city.h"
49   	#include "abstract_mem.h"
50   	#include "gsh_intrinsic.h"
51   	#include "gsh_wait_queue.h"
52   	
53   	#define DUPREQ_NOCACHE   0x02
54   	#define DUPREQ_MAX_RETRIES 5
55   	
56   	#define NFS_pcp nfs_param.core_param
57   	#define NFS_program NFS_pcp.program
58   	
59   	pool_t *dupreq_pool;
60   	pool_t *nfs_res_pool;
61   	pool_t *tcp_drc_pool;		/* pool of per-connection DRC objects */
62   	
63   	const char *dupreq_status_table[] = {
64   		"DUPREQ_SUCCESS",
65   		"DUPREQ_INSERT_MALLOC_ERROR",
66   		"DUPREQ_BEING_PROCESSED",
67   		"DUPREQ_EXISTS",
68   		"DUPREQ_ERROR",
69   	};
70   	
71   	const char *dupreq_state_table[] = {
72   		"DUPREQ_START",
73   		"DUPREQ_COMPLETE",
74   		"DUPREQ_DELETED",
75   	};
76   	
77   	/* drc_t holds the request/response cache. There is a single drc_t for
78   	 * all udp connections. There is a drc_t for each tcp connection (aka
79   	 * socket). Since a client could close a socket and reconnect, we would
80   	 * like to use the same drc cache for the reconnection. For this reason,
81   	 * we don't want to free the drc as soon as the tcp connection gets
82   	 * closed, but rather keep them in a recycle list for sometime.
83   	 *
84   	 * The life of tcp drc: it gets allocated when we process the first
85   	 * request on the connection. It is put into rbtree (tcp_drc_recycle_t).
86   	 * drc cache maintains a ref count. Every request as well as the xprt
87   	 * holds a ref count. Its ref count should go to zero when the
88   	 * connection's xprt gets freed (all requests should be completed on the
89   	 * xprt by this time). When the ref count goes to zero, it is also put
90   	 * into a recycle queue (tcp_drc_recycle_q). When a reconnection
91   	 * happens, we hope to find the same drc that was used before, and the
92   	 * ref count goes up again. At the same time, the drc will be removed
93   	 * from the recycle queue. Only drc's with ref count zero end up in the
94   	 * recycle queue. If a reconnection doesn't happen in time, the drc gets
95   	 * freed by drc_free_expired() after some period of inactivety.
96   	 *
97   	 * Most ref count methods assume that a ref count doesn't go up from
98   	 * zero, so a thread that decrements the ref count to zero would be the
99   	 * only one acting on it, and it could do so without any locks!  Since
100  	 * the drc ref count could go up from zero, care must be taken. The
101  	 * thread that decrements the ref count to zero will have to put the drc
102  	 * into the recycle queue. It will do so only after dropping the lock in
103  	 * the current implementation. If we let nfs_dupreq_get_drc() reuse the
104  	 * drc before it gets into recycle queue, we could end up with multiple
105  	 * threads that decrement the ref count to zero.
106  	 *
107  	 * Life of a dupreq: A thread processing an NFS request calls the
108  	 * following functions in the order listed below:
109  	 *
110  	 * #1. nfs_dupreq_start(): This creates/gets a dupreq for the request if
111  	 * it needs DRC. Newly created dupreq is placed in a hash table as well
112  	 * as in a list. Its refcnt will be 2 at the beginning, one for being in
113  	 * the hash table and the other for the request call path. If a dupreq
114  	 * already exists in the hash table, it is provided with a hold!
115  	 *
116  	 * #2. nfs_dupreq_finish()/nfs_dupreq_delete(): Only one of these two
117  	 * functions will be called. If the request is completed successfully,
118  	 * nfs_dupreq_finish() is called which sets the state of the dupreq to
119  	 * complete. If the request processing fails, usually there is no reason
120  	 * to save the response, so nfs_dupreq_delete() is called to remove the
121  	 * dupreq from the hash table. Neither function releases the hold placed
122  	 * on the dupreq in nfs_dupreq_start() for the request call path.
123  	 * nfs_dupreq_delete() does release a hold on the dupreq for removing it
124  	 * from the hash table itself though.
125  	 *
126  	 * #3. Finally nfs_dupreq_rele(): At the end of the request processing,
127  	 * this function is called. This releases the hold placed on the dupreq
128  	 * in the nfs_dupreq_start() for the request call path.
129  	 *
130  	 * A dupreq exists at least until the nfs_dupreq_rele() call. A hashed
131  	 * dupreq will exist beyond this call as it is in the hash table. A
132  	 * dupreq eventually gets removed from the hash table when the drc gets
133  	 * freed or in nfs_dupreq_finish() that decides to take out few dupreqs!
134  	 */
135  	struct drc_st {
136  		pthread_mutex_t mtx;
137  		drc_t udp_drc;		/* shared DRC */
138  		struct rbtree_x tcp_drc_recycle_t;
139  		 TAILQ_HEAD(drc_st_tailq, drc) tcp_drc_recycle_q;	/* fifo */
140  		int32_t tcp_drc_recycle_qlen;
141  		time_t last_expire_check;
142  		uint32_t expire_delta;
143  	};
144  	
145  	static struct drc_st *drc_st;
146  	
147  	/**
148  	 * @brief Comparison function for duplicate request entries.
149  	 *
150  	 * @param[in] lhs An integer
151  	 * @param[in] rhs Another integer
152  	 *
153  	 * @return -1 if the left-hand is smaller than the right, 0 if they
154  	 * are equal, and 1 if the left-hand is larger.
155  	 */
156  	static inline int uint32_cmpf(uint32_t lhs, uint32_t rhs)
157  	{
158  		if (lhs < rhs)
159  			return -1;
160  	
161  		if (lhs == rhs)
162  			return 0;
163  	
164  		return 1;
165  	}
166  	
167  	/**
168  	 * @brief Comparison function for duplicate request entries.
169  	 *
170  	 * @param[in] lhs An integer
171  	 * @param[in] rhs Another integer
172  	 *
173  	 * @return -1 if the left-hand is smaller than the right, 0 if they
174  	 * are equal, and 1 if the left-hand is larger.
175  	 */
176  	static inline int uint64_cmpf(uint64_t lhs, uint64_t rhs)
177  	{
178  		if (lhs < rhs)
179  			return -1;
180  	
181  		if (lhs == rhs)
182  			return 0;
183  	
184  		return 1;
185  	}
186  	
187  	/**
188  	 * @brief Comparison function for entries in a shared DRC
189  	 *
190  	 * @param[in] lhs  Left-hand-side
191  	 * @param[in] rhs  Right-hand-side
192  	 *
193  	 * @return -1,0,1.
194  	 */
195  	static inline int dupreq_shared_cmpf(const struct opr_rbtree_node *lhs,
196  					     const struct opr_rbtree_node *rhs)
197  	{
198  		dupreq_entry_t *lk, *rk;
199  	
200  		lk = opr_containerof(lhs, dupreq_entry_t, rbt_k);
201  		rk = opr_containerof(rhs, dupreq_entry_t, rbt_k);
202  	
203  		switch (sockaddr_cmpf(&lk->hin.addr, &rk->hin.addr, false)) {
204  		case -1:
205  			return -1;
206  		case 0:
207  			switch (uint32_cmpf(lk->hin.tcp.rq_xid, rk->hin.tcp.rq_xid)) {
208  			case -1:
209  				return -1;
210  			case 0:
211  				return uint64_cmpf(lk->hk, rk->hk);
212  			default:
213  				break;
214  			}		/* xid */
215  			break;
216  		default:
217  			break;
218  		}			/* addr+port */
219  	
220  		return 1;
221  	}
222  	
223  	/**
224  	 * @brief Comparison function for entries in a per-connection (TCP) DRC
225  	 *
226  	 * @param[in] lhs  Left-hand-side
227  	 * @param[in] rhs  Right-hand-side
228  	 *
229  	 * @return -1,0,1.
230  	 */
231  	static inline int dupreq_tcp_cmpf(const struct opr_rbtree_node *lhs,
232  					  const struct opr_rbtree_node *rhs)
233  	{
234  		dupreq_entry_t *lk, *rk;
235  	
236  		LogDebug(COMPONENT_DUPREQ, "%s", __func__);
237  	
238  		lk = opr_containerof(lhs, dupreq_entry_t, rbt_k);
239  		rk = opr_containerof(rhs, dupreq_entry_t, rbt_k);
240  	
241  		if (lk->hin.tcp.rq_xid < rk->hin.tcp.rq_xid)
242  			return -1;
243  	
244  		if (lk->hin.tcp.rq_xid == rk->hin.tcp.rq_xid) {
245  			LogDebug(COMPONENT_DUPREQ,
246  				 "xids eq %" PRIu32 ", ck1 %" PRIu64 " ck2 %" PRIu64,
247  				 lk->hin.tcp.rq_xid, lk->hk, rk->hk);
248  			return uint64_cmpf(lk->hk, rk->hk);
249  		}
250  	
251  		return 1;
252  	}
253  	
254  	/**
255  	 * @brief Comparison function for recycled per-connection (TCP) DRCs
256  	 *
257  	 * @param[in] lhs  Left-hand-side
258  	 * @param[in] rhs  Right-hand-side
259  	 *
260  	 * @return -1,0,1.
261  	 */
262  	static inline int drc_recycle_cmpf(const struct opr_rbtree_node *lhs,
263  					   const struct opr_rbtree_node *rhs)
264  	{
265  		drc_t *lk, *rk;
266  	
267  		lk = opr_containerof(lhs, drc_t, d_u.tcp.recycle_k);
268  		rk = opr_containerof(rhs, drc_t, d_u.tcp.recycle_k);
269  	
270  		return sockaddr_cmpf(
271  			&lk->d_u.tcp.addr, &rk->d_u.tcp.addr, false);
272  	}
273  	
274  	/**
275  	 * @brief Initialize a shared duplicate request cache
276  	 */
277  	static inline void init_shared_drc(void)
278  	{
279  		drc_t *drc = &drc_st->udp_drc;
280  		int ix, code __attribute__ ((unused)) = 0;
281  	
282  		drc->type = DRC_UDP_V234;
283  		drc->refcnt = 0;
284  		drc->retwnd = 0;
285  		drc->d_u.tcp.recycle_time = 0;
286  		drc->maxsize = nfs_param.core_param.drc.udp.size;
287  		drc->cachesz = nfs_param.core_param.drc.udp.cachesz;
288  		drc->npart = nfs_param.core_param.drc.udp.npart;
289  		drc->hiwat = nfs_param.core_param.drc.udp.hiwat;
290  	
291  		gsh_mutex_init(&drc->mtx, NULL);
292  	
293  		/* init dict */
294  		code =
295  		    rbtx_init(&drc->xt, dupreq_shared_cmpf, drc->npart,
296  			      RBT_X_FLAG_ALLOC | RBT_X_FLAG_CACHE_WT);
297  		assert(!code);
298  	
299  		/* completed requests */
300  		TAILQ_INIT(&drc->dupreq_q);
301  	
302  		/* init closed-form "cache" partition */
303  		for (ix = 0; ix < drc->npart; ++ix) {
304  			struct rbtree_x_part *xp = &(drc->xt.tree[ix]);
305  	
306  			drc->xt.cachesz = drc->cachesz;
307  			xp->cache =
308  			    gsh_calloc(drc->cachesz, sizeof(struct opr_rbtree_node *));
309  		}
310  	}
311  	
312  	/**
313  	 * @brief Initialize the DRC package.
314  	 */
315  	void dupreq2_pkginit(void)
316  	{
317  		int code __attribute__ ((unused)) = 0;
318  	
319  		dupreq_pool =
320  		    pool_basic_init("Duplicate Request Pool", sizeof(dupreq_entry_t));
321  	
322  		nfs_res_pool = pool_basic_init("nfs_res_t pool", sizeof(nfs_res_t));
323  	
324  		tcp_drc_pool = pool_basic_init("TCP DRC Pool", sizeof(drc_t));
325  	
326  		drc_st = gsh_calloc(1, sizeof(struct drc_st));
327  	
328  		/* init shared statics */
329  		gsh_mutex_init(&drc_st->mtx, NULL);
330  	
331  		/* recycle_t */
332  		code =
333  		    rbtx_init(&drc_st->tcp_drc_recycle_t, drc_recycle_cmpf,
334  			      nfs_param.core_param.drc.tcp.recycle_npart,
335  			      RBT_X_FLAG_ALLOC);
336  		/* XXX error? */
337  	
338  		/* init recycle_q */
339  		TAILQ_INIT(&drc_st->tcp_drc_recycle_q);
340  		drc_st->tcp_drc_recycle_qlen = 0;
341  		drc_st->last_expire_check = time(NULL);
342  		drc_st->expire_delta = nfs_param.core_param.drc.tcp.recycle_expire_s;
343  	
344  		/* UDP DRC is global, shared */
345  		init_shared_drc();
346  	}
347  	
348  	/**
349  	 * @brief Determine the protocol of the supplied TI-RPC SVCXPRT*
350  	 *
351  	 * @param[in] xprt  The SVCXPRT
352  	 *
353  	 * @return IPPROTO_UDP or IPPROTO_TCP.
354  	 */
355  	static inline unsigned int get_ipproto_by_xprt(SVCXPRT *xprt)
356  	{
357  		switch (xprt->xp_type) {
358  		case XPRT_UDP:
359  		case XPRT_UDP_RENDEZVOUS:
360  			return IPPROTO_UDP;
361  		case XPRT_TCP:
362  		case XPRT_TCP_RENDEZVOUS:
363  			return IPPROTO_TCP;
364  		default:
365  			break;
366  		}
367  		return IPPROTO_IP;	/* Dummy output */
368  	}
369  	
370  	/**
371  	 * @brief Determine the dupreq2 DRC type to handle the supplied svc_req
372  	 *
373  	 * @param[in] req The svc_req being processed
374  	 *
375  	 * @return a value of type enum_drc_type.
376  	 */
377  	static inline enum drc_type get_drc_type(struct svc_req *req)
378  	{
379  		if (get_ipproto_by_xprt(req->rq_xprt) == IPPROTO_UDP)
380  			return DRC_UDP_V234;
381  		else {
382  			if (req->rq_msg.cb_vers == 4)
383  				return DRC_TCP_V4;
384  		}
385  		return DRC_TCP_V3;
386  	}
387  	
388  	/**
389  	 * @brief Allocate a duplicate request cache
390  	 *
391  	 * @param[in] dtype   Style DRC to allocate (e.g., TCP, by enum drc_type)
392  	 * @param[in] maxsz   Upper bound on requests to cache
393  	 * @param[in] cachesz Number of entries in the closed hash partition
394  	 * @param[in] flags   DRC flags
395  	 *
396  	 * @return the drc, if successfully allocated, else NULL.
397  	 */
398  	static inline drc_t *alloc_tcp_drc(enum drc_type dtype)
399  	{
400  		drc_t *drc = pool_alloc(tcp_drc_pool);
401  		int ix, code __attribute__ ((unused)) = 0;
402  	
403  		drc->type = dtype;	/* DRC_TCP_V3 or DRC_TCP_V4 */
404  		drc->refcnt = 0;
405  		drc->retwnd = 0;
406  		drc->d_u.tcp.recycle_time = 0;
407  		drc->maxsize = nfs_param.core_param.drc.tcp.size;
408  		drc->cachesz = nfs_param.core_param.drc.tcp.cachesz;
409  		drc->npart = nfs_param.core_param.drc.tcp.npart;
410  		drc->hiwat = nfs_param.core_param.drc.tcp.hiwat;
411  	
412  		PTHREAD_MUTEX_init(&drc->mtx, NULL);
413  	
414  		/* init dict */
415  		code =
416  		    rbtx_init(&drc->xt, dupreq_tcp_cmpf, drc->npart,
417  			      RBT_X_FLAG_ALLOC | RBT_X_FLAG_CACHE_WT);
418  		assert(!code);
419  	
420  		/* completed requests */
421  		TAILQ_INIT(&drc->dupreq_q);
422  	
423  		/* recycling DRC */
424  		TAILQ_INIT_ENTRY(drc, d_u.tcp.recycle_q);
425  	
426  		/* init "cache" partition */
427  		for (ix = 0; ix < drc->npart; ++ix) {
428  			struct rbtree_x_part *xp = &(drc->xt.tree[ix]);
429  	
430  			drc->xt.cachesz = drc->cachesz;
431  			xp->cache =
432  			    gsh_calloc(drc->cachesz, sizeof(struct opr_rbtree_node *));
433  		}
434  	
435  		return drc;
436  	}
437  	
438  	/**
439  	 * @brief Deep-free a per-connection (TCP) duplicate request cache
440  	 *
441  	 * @param[in] drc  The DRC to dispose
442  	 *
443  	 * Assumes that the DRC has been allocated from the tcp_drc_pool.
444  	 */
445  	static inline void free_tcp_drc(drc_t *drc)
446  	{
447  		int ix;
448  	
449  		for (ix = 0; ix < drc->npart; ++ix) {
450  			if (drc->xt.tree[ix].cache)
451  				gsh_free(drc->xt.tree[ix].cache);
452  		}
453  		PTHREAD_MUTEX_destroy(&drc->mtx);
454  		LogFullDebug(COMPONENT_DUPREQ, "free TCP drc %p", drc);
455  		pool_free(tcp_drc_pool, drc);
456  	}
457  	
458  	/**
459  	 * @brief Increment the reference count on a DRC
460  	 *
461  	 * @param[in] drc  The DRC to ref
462  	 *
463  	 * @return the new value of refcnt.
464  	 */
465  	static inline uint32_t nfs_dupreq_ref_drc(drc_t *drc)
466  	{
467  		return ++(drc->refcnt); /* locked */
468  	}
469  	
470  	/**
471  	 * @brief Decrement the reference count on a DRC
472  	 *
473  	 * @param[in] drc  The DRC to unref
474  	 *
475  	 * @return the new value of refcnt.
476  	 */
477  	static inline uint32_t nfs_dupreq_unref_drc(drc_t *drc)
478  	{
479  		return --(drc->refcnt); /* locked */
480  	}
481  	
482  	#define DRC_ST_LOCK()				\
483  		PTHREAD_MUTEX_lock(&drc_st->mtx)
484  	
485  	#define DRC_ST_UNLOCK()				\
486  		PTHREAD_MUTEX_unlock(&drc_st->mtx)
487  	
488  	/**
489  	 * @brief Check for expired TCP DRCs.
490  	 */
491  	static inline void dupreq_entry_put(dupreq_entry_t *dv);
492  	static inline void drc_free_expired(void)
493  	{
494  		drc_t *drc;
495  		time_t now = time(NULL);
496  		struct rbtree_x_part *t;
497  		struct opr_rbtree_node *odrc = NULL;
498  		struct dupreq_entry *dv;
499  		struct dupreq_entry *tdv;
500  	
501  		DRC_ST_LOCK();
502  	
503  		if ((drc_st->tcp_drc_recycle_qlen < 1) ||
504  		    (now - drc_st->last_expire_check) < 600) /* 10m */
505  			goto unlock;
506  	
507  		do {
508  			drc = TAILQ_FIRST(&drc_st->tcp_drc_recycle_q);
509  			if (drc && (drc->d_u.tcp.recycle_time > 0)
510  			    && ((now - drc->d_u.tcp.recycle_time) >
511  				drc_st->expire_delta)) {
512  	
513  				assert(drc->refcnt == 0);
514  	
515  				LogFullDebug(COMPONENT_DUPREQ,
516  					     "remove expired drc %p from recycle queue",
517  					     drc);
518  				t = rbtx_partition_of_scalar(&drc_st->tcp_drc_recycle_t,
519  							     drc->d_u.tcp.hk);
520  	
521  				odrc =
522  				    opr_rbtree_lookup(&t->t, &drc->d_u.tcp.recycle_k);
523  				if (!odrc) {
524  					LogCrit(COMPONENT_DUPREQ,
525  						"BUG: asked to dequeue DRC not on queue");
526  				} else {
527  					(void)opr_rbtree_remove(
528  							&t->t, &drc->d_u.tcp.recycle_k);
529  				}
530  				TAILQ_REMOVE(&drc_st->tcp_drc_recycle_q, drc,
531  					     d_u.tcp.recycle_q);
532  				--(drc_st->tcp_drc_recycle_qlen);
533  	
534  				/* Free any dupreqs in this drc. No need to
535  				 * remove dupreqs from the hash table or
536  				 * drc->dupreq_q list individually as the drc is
537  				 * going to be freed anyway.  There shouldn't be
538  				 * any active requests, so all these dupreqs
539  				 * will have refcnt of 1 for being in the hash
540  				 * table.
541  				 */
542  				TAILQ_FOREACH_SAFE(dv, &drc->dupreq_q, fifo_q, tdv) {
543  					assert(dv->refcnt == 1);
544  					dupreq_entry_put(dv);
545  				}
546  				free_tcp_drc(drc);
547  			} else {
548  				LogFullDebug(COMPONENT_DUPREQ,
549  					     "unexpired drc %p in recycle queue expire check (nothing happens)",
550  					     drc);
551  				drc_st->last_expire_check = now;
552  				break;
553  			}
554  	
555  		} while (1);
556  	
557  	 unlock:
558  		DRC_ST_UNLOCK();
559  	}
560  	
561  	/**
562  	 * @brief Find and reference a DRC to process the supplied svc_req.
563  	 *
564  	 * @param[in] req  The svc_req being processed.
565  	 *
566  	 * @return The ref'd DRC if sucessfully located, else NULL.
567  	 */
568  	static /* inline */ drc_t *
569  	nfs_dupreq_get_drc(struct svc_req *req)
570  	{
571  		enum drc_type dtype = get_drc_type(req);
572  		drc_t *drc = NULL;
573  		bool drc_check_expired = false;
574  	
575  		switch (dtype) {
576  		case DRC_UDP_V234:
577  			LogFullDebug(COMPONENT_DUPREQ, "ref shared UDP DRC");
578  			drc = &(drc_st->udp_drc);
579  			DRC_ST_LOCK();
580  			(void)nfs_dupreq_ref_drc(drc);
581  			DRC_ST_UNLOCK();
582  			goto out;
583  	retry:
584  		case DRC_TCP_V4:
585  		case DRC_TCP_V3:
586  			/* Idempotent address, no need for lock;
587  			 * xprt will be valid as long as svc_req.
588  			 */
589  			drc = (drc_t *)req->rq_xprt->xp_u2;
590  			if (drc) {
591  				/* found, no danger of removal */
592  				LogFullDebug(COMPONENT_DUPREQ, "ref DRC=%p for xprt=%p",
593  					     drc, req->rq_xprt);
594  				PTHREAD_MUTEX_lock(&drc->mtx);	/* LOCKED */
595  			} else {
596  				drc_t drc_k;
597  				struct rbtree_x_part *t = NULL;
598  				struct opr_rbtree_node *ndrc = NULL;
599  				drc_t *tdrc = NULL;
600  	
601  				memset(&drc_k, 0, sizeof(drc_k));
602  				drc_k.type = dtype;
603  	
604  				/* Since the drc can last longer than the xprt,
605  				 * copy the address. Read operation of constant data,
606  				 * no xprt lock required.
607  				 */
608  				(void)copy_xprt_addr(&drc_k.d_u.tcp.addr, req->rq_xprt);
609  	
610  				drc_k.d_u.tcp.hk =
611  				    CityHash64WithSeed((char *)&drc_k.d_u.tcp.addr,
612  						       sizeof(sockaddr_t), 911);
613  				{
614  					char str[SOCK_NAME_MAX];
615  	
616  					sprint_sockaddr(&drc_k.d_u.tcp.addr,
617  							str, sizeof(str));
618  					LogFullDebug(COMPONENT_DUPREQ,
619  						     "get drc for addr: %s", str);
620  				}
621  	
622  				t = rbtx_partition_of_scalar(&drc_st->tcp_drc_recycle_t,
623  							     drc_k.d_u.tcp.hk);
624  				DRC_ST_LOCK();
625  	
626  				/* Avoid double reference of drc,
627  				 * rechecking xp_u2 after DRC_ST_LOCK */
628  				if (req->rq_xprt->xp_u2) {
629  					DRC_ST_UNLOCK();
630  					goto retry;
631  				}
632  	
633  				ndrc =
634  				    opr_rbtree_lookup(&t->t, &drc_k.d_u.tcp.recycle_k);
635  				if (ndrc) {
636  					/* reuse old DRC */
637  					tdrc = opr_containerof(ndrc, drc_t,
638  							       d_u.tcp.recycle_k);
639  					PTHREAD_MUTEX_lock(&tdrc->mtx);	/* LOCKED */
640  	
641  					/* If the refcnt is zero and it is not
642  					 * in the recycle queue, wait for the
643  					 * other thread to put it in the queue.
644  					 */
645  					if (tdrc->refcnt == 0) {
646  						if (!(tdrc->flags & DRC_FLAG_RECYCLE)) {
647  							PTHREAD_MUTEX_unlock(
648  									&tdrc->mtx);
649  							DRC_ST_UNLOCK();
650  							goto retry;
651  						}
652  						TAILQ_REMOVE(&drc_st->tcp_drc_recycle_q,
653  							     tdrc, d_u.tcp.recycle_q);
654  						--(drc_st->tcp_drc_recycle_qlen);
655  						tdrc->flags &= ~DRC_FLAG_RECYCLE;
656  					}
657  					drc = tdrc;
658  					LogFullDebug(COMPONENT_DUPREQ,
659  						     "recycle TCP DRC=%p for xprt=%p",
660  						     tdrc, req->rq_xprt);
661  				}
662  	
663  				if (!drc) {
664  					drc = alloc_tcp_drc(dtype);
665  					LogFullDebug(COMPONENT_DUPREQ,
666  						     "alloc new TCP DRC=%p for xprt=%p",
667  						     drc, req->rq_xprt);
668  					/* assign addr */
669  					memcpy(&drc->d_u.tcp.addr, &drc_k.d_u.tcp.addr,
670  					       sizeof(sockaddr_t));
671  					/* assign already-computed hash */
672  					drc->d_u.tcp.hk = drc_k.d_u.tcp.hk;
673  					PTHREAD_MUTEX_lock(&drc->mtx);	/* LOCKED */
674  					/* insert dict */
675  					opr_rbtree_insert(&t->t,
676  							  &drc->d_u.tcp.recycle_k);
677  				}
678  	
679  				/* Avoid double reference of drc,
680  				 * setting xp_u2 under DRC_ST_LOCK */
681  				req->rq_xprt->xp_u2 = (void *)drc;
682  				(void)nfs_dupreq_ref_drc(drc);  /* xprt ref */
683  	
684  				DRC_ST_UNLOCK();
685  				drc->d_u.tcp.recycle_time = 0;
686  	
687  				/* try to expire unused DRCs somewhat in proportion to
688  				 * new connection arrivals */
689  				drc_check_expired = true;
690  	
691  				LogFullDebug(COMPONENT_DUPREQ,
692  					     "after ref drc %p refcnt==%u ", drc,
693  					     drc->refcnt);
694  			}
695  			break;
696  		default:
697  			/* XXX error */
698  			break;
699  		}
700  	
701  		/* call path ref */
702  		(void)nfs_dupreq_ref_drc(drc);
703  		PTHREAD_MUTEX_unlock(&drc->mtx);
704  	
705  		if (drc_check_expired)
706  			drc_free_expired();
707  	
708  	out:
709  		return drc;
710  	}
711  	
712  	/**
713  	 * @brief Release previously-ref'd DRC.
714  	 *
715  	 * Release previously-ref'd DRC.  If its refcnt drops to 0, the DRC
716  	 * is queued for later recycling.
717  	 *
718  	 * @param[in] drc   The DRC
719  	 * @param[in] flags Control flags
720  	 */
721  	void nfs_dupreq_put_drc(drc_t *drc, uint32_t flags)
722  	{
723  		if (!(flags & DRC_FLAG_LOCKED))
724  			PTHREAD_MUTEX_lock(&drc->mtx);
725  		/* drc LOCKED */
726  	
727  		if (drc->refcnt == 0) {
728  			LogCrit(COMPONENT_DUPREQ,
729  				"drc %p refcnt will underrun refcnt=%u", drc,
730  				drc->refcnt);
731  		}
732  	
733  		nfs_dupreq_unref_drc(drc);
734  	
735  		LogFullDebug(COMPONENT_DUPREQ, "drc %p refcnt==%u", drc, drc->refcnt);
736  	
737  		switch (drc->type) {
738  		case DRC_UDP_V234:
739  			/* do nothing */
740  			break;
741  		case DRC_TCP_V4:
742  		case DRC_TCP_V3:
743  			if (drc->refcnt != 0) /* quick path */
744  				break;
745  	
746  			/* note t's lock order wrt drc->mtx is the opposite of
747  			 * drc->xt[*].lock. Drop and reacquire locks in correct
748  			 * order.
749  			 */
750  			PTHREAD_MUTEX_unlock(&drc->mtx);
751  			DRC_ST_LOCK();
752  			PTHREAD_MUTEX_lock(&drc->mtx);
753  	
754  			/* Since we dropped and reacquired the drc lock for the
755  			 * correct lock order, we need to recheck the drc fields
756  			 * again!
757  			 */
758  			if (drc->refcnt == 0 && !(drc->flags & DRC_FLAG_RECYCLE)) {
759  				drc->d_u.tcp.recycle_time = time(NULL);
760  				drc->flags |= DRC_FLAG_RECYCLE;
761  				TAILQ_INSERT_TAIL(&drc_st->tcp_drc_recycle_q,
762  						  drc, d_u.tcp.recycle_q);
763  				++(drc_st->tcp_drc_recycle_qlen);
764  				LogFullDebug(COMPONENT_DUPREQ,
765  					     "enqueue drc %p for recycle", drc);
766  			}
767  			DRC_ST_UNLOCK();
768  			break;
769  	
770  		default:
771  			break;
772  		};
773  	
774  		PTHREAD_MUTEX_unlock(&drc->mtx); /* !LOCKED */
775  	}
776  	
777  	/**
778  	 * @brief Resolve indirect request function vector for the supplied DRC entry
779  	 *
780  	 * @param[in] dv The duplicate request entry.
781  	 *
782  	 * @return The function vector if successful, else NULL.
783  	 */
784  	static inline const nfs_function_desc_t *nfs_dupreq_func(dupreq_entry_t *dv)
785  	{
786  		const nfs_function_desc_t *func = NULL;
787  	
788  		if (dv->hin.rq_prog == NFS_program[P_NFS]) {
789  			switch (dv->hin.rq_vers) {
790  	#ifdef _USE_NFS3
791  			case NFS_V3:
792  				func = &nfs3_func_desc[dv->hin.rq_proc];
793  				break;
794  	#endif /* _USE_NFS3 */
795  			case NFS_V4:
796  				func = &nfs4_func_desc[dv->hin.rq_proc];
797  				break;
798  			default:
799  				/* not reached */
800  				LogMajor(COMPONENT_DUPREQ,
801  					 "NFS Protocol version %" PRIu32 " unknown",
802  					 dv->hin.rq_vers);
803  			}
804  		} else if (dv->hin.rq_prog == NFS_program[P_MNT]) {
805  			switch (dv->hin.rq_vers) {
806  			case MOUNT_V1:
807  				func = &mnt1_func_desc[dv->hin.rq_proc];
808  				break;
809  			case MOUNT_V3:
810  				func = &mnt3_func_desc[dv->hin.rq_proc];
811  				break;
812  			default:
813  				/* not reached */
814  				LogMajor(COMPONENT_DUPREQ,
815  					 "MOUNT Protocol version %" PRIu32 " unknown",
816  					 dv->hin.rq_vers);
817  				break;
818  			}
819  	#ifdef _USE_NLM
820  		} else if (dv->hin.rq_prog == NFS_program[P_NLM]) {
821  			switch (dv->hin.rq_vers) {
822  			case NLM4_VERS:
823  				func = &nlm4_func_desc[dv->hin.rq_proc];
824  				break;
825  			}
826  	#endif /* _USE_NLM */
827  		} else if (dv->hin.rq_prog == NFS_program[P_RQUOTA]) {
828  			switch (dv->hin.rq_vers) {
829  			case RQUOTAVERS:
830  				func = &rquota1_func_desc[dv->hin.rq_proc];
831  				break;
832  			case EXT_RQUOTAVERS:
833  				func = &rquota2_func_desc[dv->hin.rq_proc];
834  				break;
835  			}
836  		} else {
837  			/* not reached */
838  			LogMajor(COMPONENT_DUPREQ,
839  				 "protocol %" PRIu32 " is not managed",
840  				 dv->hin.rq_prog);
841  		}
842  	
843  		return func;
844  	}
845  	
846  	/**
847  	 * @brief Construct a duplicate request cache entry.
848  	 *
849  	 * Entries are allocated from the dupreq_pool.  Since dupre_entry_t
850  	 * presently contains an expanded nfs_arg_t, zeroing of at least corresponding
851  	 * value pointers is required for XDR allocation.
852  	 *
853  	 * @return The newly allocated dupreq entry or NULL.
854  	 */
855  	static inline dupreq_entry_t *alloc_dupreq(void)
856  	{
857  		dupreq_entry_t *dv;
858  	
(1) Event cond_false: Condition "p_ == NULL", taking false branch.
(2) Event if_end: End of if statement.
859  		dv = pool_alloc(dupreq_pool);
860  		gsh_mutex_init(&dv->mtx, NULL);
(3) Event missing_lock: Accessing "dv->fifo_q.tqe_prev" without holding lock "drc.mtx". Elsewhere, "dupreq_entry.fifo_q.tqe_prev" is accessed with "drc.mtx" held 5 out of 6 times.
Also see events: [example_lock][example_access][example_lock][example_access][example_lock][example_access][example_lock][example_access][example_lock][example_access]
861  		TAILQ_INIT_ENTRY(dv, fifo_q);
862  	
863  		return dv;
864  	}
865  	
866  	/**
867  	 * @brief Deep-free a duplicate request cache entry.
868  	 *
869  	 * If the entry has processed request data, the corresponding free
870  	 * function is called on the result.  The cache entry is then returned
871  	 * to the dupreq_pool.
872  	 */
873  	static inline void nfs_dupreq_free_dupreq(dupreq_entry_t *dv)
874  	{
875  		const nfs_function_desc_t *func;
876  	
877  		assert(dv->refcnt == 0);
878  	
879  		LogDebug(COMPONENT_DUPREQ,
880  			 "freeing dupreq entry dv=%p, dv xid=%" PRIu32
881  			 " cksum %" PRIu64 " state=%s",
882  			 dv, dv->hin.tcp.rq_xid, dv->hk,
883  			 dupreq_state_table[dv->state]);
884  		if (dv->res) {
885  			func = nfs_dupreq_func(dv);
886  			func->free_function(dv->res);
887  			free_nfs_res(dv->res);
888  		}
889  		PTHREAD_MUTEX_destroy(&dv->mtx);
890  		pool_free(dupreq_pool, dv);
891  	}
892  	
893  	/**
894  	 * @brief get a ref count on dupreq_entry_t
895  	 */
896  	static inline void dupreq_entry_get(dupreq_entry_t *dv)
897  	{
898  		(void)atomic_inc_uint32_t(&dv->refcnt);
899  	}
900  	
901  	/**
902  	 * @brief release a ref count on dupreq_entry_t
903  	 *
904  	 * The caller must not access dv any more after this call as it could be
905  	 * freed here.
906  	 */
907  	static inline void dupreq_entry_put(dupreq_entry_t *dv)
908  	{
909  		int32_t refcnt;
910  	
911  		refcnt = atomic_dec_uint32_t(&dv->refcnt);
912  	
913  		/* If ref count is zero, no one should be accessing it other
914  		 * than us.  so no lock is needed.
915  		 */
916  		if (refcnt == 0) {
917  			nfs_dupreq_free_dupreq(dv);
918  		}
919  	}
920  	
921  	/**
922  	 * @page DRC_RETIRE DRC request retire heuristic.
923  	 *
924  	 * We add a new, per-drc semphore like counter, retwnd.  The value of
925  	 * retwnd begins at 0, and is always >= 0.  The value of retwnd is increased
926  	 * when a a duplicate req cache hit occurs.  If it was 0, it is increased by
927  	 * some small constant, say, 16, otherwise, by 1.  And retwnd decreases by 1
928  	 * when we successfully finish any request.  Likewise in finish, a cached
929  	 * request may be retired iff we are above our water mark, and retwnd is 0.
930  	 */
931  	
932  	#define RETWND_START_BIAS 16
933  	
934  	/**
935  	 * @brief advance retwnd.
936  	 *
937  	 * If (drc)->retwnd is 0, advance its value to RETWND_START_BIAS, else
938  	 * increase its value by 2 (corrects to 1) iff !full.
939  	 *
940  	 * @param[in] drc The duplicate request cache
941  	 */
942  	#define drc_inc_retwnd(drc)					\
943  		do {							\
944  			if ((drc)->retwnd == 0)				\
945  				(drc)->retwnd = RETWND_START_BIAS;	\
946  			else						\
947  				if ((drc)->retwnd < (drc)->maxsize)	\
948  					(drc)->retwnd += 2;		\
949  		} while (0)
950  	
951  	/**
952  	 * @brief conditionally decrement retwnd.
953  	 *
954  	 * If (drc)->retwnd > 0, decrease its value by 1.
955  	 *
956  	 * @param[in] drc The duplicate request cache
957  	 */
958  	#define drc_dec_retwnd(drc)			\
959  		do {					\
960  			if ((drc)->retwnd > 0)		\
961  				--((drc)->retwnd);	\
962  		} while (0)
963  	
964  	/**
965  	 * @brief retire request predicate.
966  	 *
967  	 * Calculate whether a request may be retired from the provided duplicate
968  	 * request cache.
969  	 *
970  	 * @param[in] drc The duplicate request cache
971  	 *
972  	 * @return true if a request may be retired, else false.
973  	 */
974  	static inline bool drc_should_retire(drc_t *drc)
975  	{
976  		/* do not exeed the hard bound on cache size */
977  		if (unlikely(drc->size > drc->maxsize))
978  			return true;
979  	
980  		/* otherwise, are we permitted to retire requests */
981  		if (unlikely(drc->retwnd > 0))
982  			return false;
983  	
984  		/* finally, retire if drc->size is above intended high water mark */
985  		if (unlikely(drc->size > drc->hiwat))
986  			return true;
987  	
988  		return false;
989  	}
990  	
991  	static inline bool nfs_dupreq_v4_cacheable(nfs_request_t *reqnfs)
992  	{
993  		COMPOUND4args *arg_c4 = (COMPOUND4args *)&reqnfs->arg_nfs;
994  	
995  		if (arg_c4->minorversion > 0)
996  			return false;
997  		if ((reqnfs->lookahead.flags & (NFS_LOOKAHEAD_CREATE)))
998  			/* override OPEN4_CREATE */
999  			return true;
1000 		if ((reqnfs->lookahead.flags &
1001 		     (NFS_LOOKAHEAD_OPEN | /* all logical OPEN */
1002 		      NFS_LOOKAHEAD_CLOSE | NFS_LOOKAHEAD_LOCK | /* includes LOCKU */
1003 		      NFS_LOOKAHEAD_READ | /* because large, though idempotent */
1004 		      NFS_LOOKAHEAD_READLINK |
1005 		      NFS_LOOKAHEAD_READDIR)))
1006 			return false;
1007 		return true;
1008 	}
1009 	
1010 	/**
1011 	 * @brief Start a duplicate request transaction
1012 	 *
1013 	 * Finds any matching request entry in the cache, if one exists, else
1014 	 * creates one in the START state.  On any non-error return, the refcnt
1015 	 * of the corresponding entry is incremented.
1016 	 *
1017 	 * @param[in] reqnfs  The NFS request data
1018 	 * @param[in] req     The request to be cached
1019 	 *
1020 	 * @retval DUPREQ_SUCCESS if successful.
1021 	 * @retval DUPREQ_INSERT_MALLOC_ERROR if an error occured during insertion.
1022 	 */
1023 	dupreq_status_t nfs_dupreq_start(nfs_request_t *reqnfs,
1024 					 struct svc_req *req)
1025 	{
1026 		dupreq_entry_t *dv = NULL, *dk = NULL;
1027 		drc_t *drc;
1028 		dupreq_status_t status = DUPREQ_SUCCESS;
1029 	
1030 		if (!(reqnfs->funcdesc->dispatch_behaviour & CAN_BE_DUP))
1031 			goto no_cache;
1032 	
1033 		if (nfs_param.core_param.drc.disabled)
1034 			goto no_cache;
1035 	
1036 		if (reqnfs->funcdesc->service_function == nfs4_Compound
1037 		 && !nfs_dupreq_v4_cacheable(reqnfs)) {
1038 			/* For such requests, we merely thread the request
1039 			 * through for later cleanup.  All v41 caching is
1040 			 * handled by the v41 slot reply cache.
1041 			 */
1042 			goto no_cache;
1043 		}
1044 	
1045 		drc = nfs_dupreq_get_drc(req);
1046 		dk = alloc_dupreq();
1047 	
1048 		switch (drc->type) {
1049 		case DRC_TCP_V4:
1050 		case DRC_TCP_V3:
1051 			dk->hin.tcp.rq_xid = req->rq_msg.rm_xid;
1052 			/* XXX needed? */
1053 			dk->hin.rq_prog = req->rq_msg.cb_prog;
1054 			dk->hin.rq_vers = req->rq_msg.cb_vers;
1055 			dk->hin.rq_proc = req->rq_msg.cb_proc;
1056 			break;
1057 		case DRC_UDP_V234:
1058 			dk->hin.tcp.rq_xid = req->rq_msg.rm_xid;
1059 			if (unlikely(!copy_xprt_addr(&dk->hin.addr, req->rq_xprt))) {
1060 				nfs_dupreq_put_drc(drc, DRC_FLAG_NONE);
1061 				nfs_dupreq_free_dupreq(dk);
1062 				return DUPREQ_INSERT_MALLOC_ERROR;
1063 			}
1064 			dk->hin.rq_prog = req->rq_msg.cb_prog;
1065 			dk->hin.rq_vers = req->rq_msg.cb_vers;
1066 			dk->hin.rq_proc = req->rq_msg.cb_proc;
1067 			break;
1068 		default:
1069 			/* @todo: should this be an assert? */
1070 			nfs_dupreq_put_drc(drc, DRC_FLAG_NONE);
1071 			nfs_dupreq_free_dupreq(dk);
1072 			return DUPREQ_INSERT_MALLOC_ERROR;
1073 		}
1074 	
1075 		dk->hk = req->rq_cksum; /* TI-RPC computed checksum */
1076 		dk->state = DUPREQ_START;
1077 		dk->timestamp = time(NULL);
1078 	
1079 		{
1080 			struct opr_rbtree_node *nv;
1081 			struct rbtree_x_part *t =
1082 			    rbtx_partition_of_scalar(&drc->xt, dk->hk);
1083 			PTHREAD_MUTEX_lock(&t->mtx);	/* partition lock */
1084 			nv = rbtree_x_cached_lookup(&drc->xt, t, &dk->rbt_k, dk->hk);
1085 			if (nv) {
1086 				/* cached request */
1087 				nfs_dupreq_free_dupreq(dk);
1088 				dv = opr_containerof(nv, dupreq_entry_t, rbt_k);
1089 				PTHREAD_MUTEX_lock(&dv->mtx);
1090 				if (unlikely(dv->state == DUPREQ_START)) {
1091 					status = DUPREQ_BEING_PROCESSED;
1092 				} else {
1093 					/* satisfy req from the DRC, incref,
1094 					   extend window */
1095 					req->rq_u1 = dv;
1096 					reqnfs->res_nfs = req->rq_u2 = dv->res;
1097 					status = DUPREQ_EXISTS;
1098 					dupreq_entry_get(dv);
1099 				}
1100 				PTHREAD_MUTEX_unlock(&dv->mtx);
1101 	
1102 				if (status == DUPREQ_EXISTS) {
1103 					PTHREAD_MUTEX_lock(&drc->mtx);
1104 					drc_inc_retwnd(drc);
1105 					PTHREAD_MUTEX_unlock(&drc->mtx);
1106 				}
1107 	
1108 				LogDebug(COMPONENT_DUPREQ,
1109 					 "dupreq hit dv=%p, dv xid=%" PRIu32
1110 					 " cksum %" PRIu64 " state=%s",
1111 					 dv, dv->hin.tcp.rq_xid, dv->hk,
1112 					 dupreq_state_table[dv->state]);
1113 			} else {
1114 				/* new request */
1115 				req->rq_u1 = dk;
1116 				dk->res = alloc_nfs_res();
1117 				reqnfs->res_nfs = req->rq_u2 = dk->res;
1118 	
1119 				/* cache--can exceed drc->maxsize */
1120 				(void)rbtree_x_cached_insert(&drc->xt, t,
1121 							&dk->rbt_k, dk->hk);
1122 	
1123 				/* dupreq ref count starts with 2; one for the caller
1124 				 * and another for staying in the hash table.
1125 				 */
1126 				dk->refcnt = 2;
1127 	
1128 				/* add to q tail */
(12) Event example_lock: Example 5: Locking "drc.mtx".
Also see events: [missing_lock][example_lock][example_access][example_lock][example_access][example_lock][example_access][example_lock][example_access][example_access]
1129 				PTHREAD_MUTEX_lock(&drc->mtx);
(13) Event example_access: Example 5 (cont.): "dupreq_entry.fifo_q.tqe_prev" is accessed with lock "drc.mtx" held.
Also see events: [missing_lock][example_lock][example_access][example_lock][example_access][example_lock][example_access][example_lock][example_access][example_lock]
1130 				TAILQ_INSERT_TAIL(&drc->dupreq_q, dk, fifo_q);
1131 				++(drc->size);
1132 				PTHREAD_MUTEX_unlock(&drc->mtx);
1133 	
1134 				LogFullDebug(COMPONENT_DUPREQ,
1135 					     "starting dk=%p xid=%" PRIu32
1136 					     " on DRC=%p state=%s, status=%s, refcnt=%d, drc->size=%d",
1137 					     dk, dk->hin.tcp.rq_xid, drc,
1138 					     dupreq_state_table[dk->state],
1139 					     dupreq_status_table[status],
1140 					     dk->refcnt, drc->size);
1141 			}
1142 			PTHREAD_MUTEX_unlock(&t->mtx);
1143 		}
1144 	
1145 		return status;
1146 	
1147 	no_cache:
1148 		req->rq_u1 = (void *)DUPREQ_NOCACHE;
1149 		reqnfs->res_nfs = req->rq_u2 = alloc_nfs_res();
1150 		return DUPREQ_SUCCESS;
1151 	}
1152 	
1153 	/**
1154 	 * @brief Completes a request in the cache
1155 	 *
1156 	 * Completes a cache insertion operation begun in nfs_dupreq_start.
1157 	 * The refcnt of the corresponding duplicate request entry is unchanged
1158 	 * (ie, the caller must still call nfs_dupreq_rele).
1159 	 *
1160 	 * In contrast with the prior DRC implementation, completing a request
1161 	 * in the current implementation may under normal conditions cause one
1162 	 * or more cached requests to be retired.  Requests are retired in the
1163 	 * order they were inserted.  The primary retire algorithm is a high
1164 	 * water mark, and a windowing heuristic.  One or more requests will be
1165 	 * retired if the water mark/timeout is exceeded, and if a no duplicate
1166 	 * requests have been found in the cache in a configurable window of
1167 	 * immediately preceding requests.  A timeout may supplement the water mark,
1168 	 * in future.
1169 	 *
1170 	 * req->rq_u1 has either a magic value, or points to a duplicate request
1171 	 * cache entry allocated in nfs_dupreq_start.
1172 	 *
1173 	 * @param[in] req     The request
1174 	 * @param[in] res_nfs The response
1175 	 *
1176 	 * @return DUPREQ_SUCCESS if successful.
1177 	 * @return DUPREQ_INSERT_MALLOC_ERROR if an error occured.
1178 	 */
1179 	dupreq_status_t nfs_dupreq_finish(struct svc_req *req, nfs_res_t *res_nfs)
1180 	{
1181 		dupreq_entry_t *ov = NULL, *dv = (dupreq_entry_t *)req->rq_u1;
1182 		dupreq_status_t status = DUPREQ_SUCCESS;
1183 		struct rbtree_x_part *t;
1184 		drc_t *drc = NULL;
1185 		int16_t cnt = 0;
1186 	
1187 		/* do nothing if req is marked no-cache */
1188 		if (dv == (void *)DUPREQ_NOCACHE)
1189 			goto out;
1190 	
1191 		PTHREAD_MUTEX_lock(&dv->mtx);
1192 		dv->res = res_nfs;
1193 		dv->timestamp = time(NULL);
1194 		dv->state = DUPREQ_COMPLETE;
1195 		PTHREAD_MUTEX_unlock(&dv->mtx);
1196 	
1197 		drc = req->rq_xprt->xp_u2; /* req holds a ref on drc */
1198 		PTHREAD_MUTEX_lock(&drc->mtx);
1199 	
1200 		LogFullDebug(COMPONENT_DUPREQ,
1201 			     "completing dv=%p xid=%" PRIu32
1202 			     " on DRC=%p state=%s, status=%s, refcnt=%d, drc->size=%d",
1203 			dv, dv->hin.tcp.rq_xid, drc,
1204 			dupreq_state_table[dv->state], dupreq_status_table[status],
1205 			dv->refcnt, drc->size);
1206 	
1207 		/* (all) finished requests count against retwnd */
1208 		drc_dec_retwnd(drc);
1209 	
1210 		/* conditionally retire entries */
1211 	dq_again:
1212 		if (drc_should_retire(drc)) {
1213 			ov = TAILQ_FIRST(&drc->dupreq_q);
1214 			if (likely(ov)) {
1215 				/* remove dict entry */
1216 				t = rbtx_partition_of_scalar(&drc->xt, ov->hk);
1217 				uint64_t ov_hk = ov->hk;
1218 	
1219 				/* Need to acquire partition lock, but the lock
1220 				 * order is partition lock followed by drc lock.
1221 				 * Drop drc lock and reacquire it!
1222 				 */
1223 				PTHREAD_MUTEX_unlock(&drc->mtx);
1224 				PTHREAD_MUTEX_lock(&t->mtx);	/* partition lock */
(8) Event example_lock: Example 3: Locking "drc.mtx".
(10) Event example_lock: Example 4: Locking "drc.mtx".
Also see events: [missing_lock][example_lock][example_access][example_lock][example_access][example_access][example_access][example_lock][example_access]
1225 				PTHREAD_MUTEX_lock(&drc->mtx);
1226 	
1227 				/* Since we dropped drc lock and reacquired it,
1228 				 * the drc dupreq list may have changed. Get the
1229 				 * dupreq entry from the list again.
1230 				 */
1231 				ov = TAILQ_FIRST(&drc->dupreq_q);
1232 	
1233 				/* Make sure that we are removing the entry we
1234 				 * expected (imperfect, but harmless).
1235 				 */
1236 				if (ov == NULL || ov->hk != ov_hk) {
1237 					PTHREAD_MUTEX_unlock(&t->mtx);
1238 					goto unlock;
1239 				}
1240 	
1241 				/* remove q entry */
(9) Event example_access: Example 3 (cont.): "dupreq_entry.fifo_q.tqe_prev" is accessed with lock "drc.mtx" held.
Also see events: [missing_lock][example_lock][example_access][example_lock][example_access][example_lock][example_lock][example_access][example_lock][example_access]
1242 				TAILQ_REMOVE(&drc->dupreq_q, ov, fifo_q);
(11) Event example_access: Example 4 (cont.): "dupreq_entry.fifo_q.tqe_prev" is accessed with lock "drc.mtx" held.
Also see events: [missing_lock][example_lock][example_access][example_lock][example_access][example_lock][example_access][example_lock][example_lock][example_access]
1243 				TAILQ_INIT_ENTRY(ov, fifo_q);
1244 				--(drc->size);
1245 				PTHREAD_MUTEX_unlock(&drc->mtx);
1246 	
1247 				rbtree_x_cached_remove(&drc->xt, t, &ov->rbt_k, ov->hk);
1248 	
1249 				PTHREAD_MUTEX_unlock(&t->mtx);
1250 	
1251 				LogDebug(COMPONENT_DUPREQ,
1252 					 "retiring ov=%p xid=%" PRIu32
1253 					 " on DRC=%p state=%s, status=%s, refcnt=%d",
1254 					 ov, ov->hin.tcp.rq_xid,
1255 					 drc, dupreq_state_table[ov->state],
1256 					 dupreq_status_table[status], ov->refcnt);
1257 	
1258 				/* release hashtable ref count */
1259 				dupreq_entry_put(ov);
1260 	
1261 				/* conditionally retire another */
1262 				if (cnt++ < DUPREQ_MAX_RETRIES) {
1263 					PTHREAD_MUTEX_lock(&drc->mtx);
1264 					goto dq_again; /* calls drc_should_retire() */
1265 				}
1266 				goto out;
1267 			}
1268 		}
1269 	
1270 	 unlock:
1271 		PTHREAD_MUTEX_unlock(&drc->mtx);
1272 	
1273 	 out:
1274 		return status;
1275 	}
1276 	
1277 	/**
1278 	 *
1279 	 * @brief Remove an entry (request) from a duplicate request cache.
1280 	 *
1281 	 * The expected pattern is that nfs_rpc_process_request shall delete requests
1282 	 * only in error conditions.  The refcnt of the corresponding duplicate request
1283 	 * entry is unchanged (ie., the caller must still call nfs_dupreq_rele).
1284 	 *
1285 	 * We assert req->rq_u1 now points to the corresonding duplicate request
1286 	 * cache entry.
1287 	 *
1288 	 * @param[in] req The svc_req structure.
1289 	 *
1290 	 * @return DUPREQ_SUCCESS if successful.
1291 	 *
1292 	 */
1293 	dupreq_status_t nfs_dupreq_delete(struct svc_req *req)
1294 	{
1295 		dupreq_entry_t *dv = (dupreq_entry_t *)req->rq_u1;
1296 		dupreq_status_t status = DUPREQ_SUCCESS;
1297 		struct rbtree_x_part *t;
1298 		drc_t *drc;
1299 	
1300 		/* do nothing if req is marked no-cache */
1301 		if (dv == (void *)DUPREQ_NOCACHE)
1302 			goto out;
1303 	
1304 		PTHREAD_MUTEX_lock(&dv->mtx);
1305 		dv->state = DUPREQ_DELETED;
1306 		PTHREAD_MUTEX_unlock(&dv->mtx);
1307 		drc = req->rq_xprt->xp_u2;
1308 	
1309 		LogFullDebug(COMPONENT_DUPREQ,
1310 			     "deleting dv=%p xid=%" PRIu32
1311 			     " on DRC=%p state=%s, status=%s, refcnt=%d",
1312 			     dv, dv->hin.tcp.rq_xid, drc,
1313 			     dupreq_state_table[dv->state], dupreq_status_table[status],
1314 			     dv->refcnt);
1315 	
1316 		/* This function is called to remove this dupreq from the
1317 		 * hashtable/list, but it is possible that another thread
1318 		 * processing a different request calling nfs_dupreq_finish()
1319 		 * might have already deleted this dupreq.
1320 		 *
1321 		 * If this dupreq is already removed from hash table/list, do
1322 		 * nothing.
1323 		 *
1324 		 * req holds a ref on drc, so it should be valid here.
1325 		 * assert(drc == (drc_t *)req->rq_xprt->xp_u2);
1326 		 */
(4) Event example_lock: Example 1: Locking "drc.mtx".
(6) Event example_lock: Example 2: Locking "drc.mtx".
Also see events: [missing_lock][example_access][example_access][example_lock][example_access][example_lock][example_access][example_lock][example_access]
1327 		PTHREAD_MUTEX_lock(&drc->mtx);
1328 		if (!TAILQ_IS_ENQUEUED(dv, fifo_q)) {
1329 			PTHREAD_MUTEX_unlock(&drc->mtx);
1330 			goto out; /* no more in the hash table/list, nothing todo */
1331 		}
(5) Event example_access: Example 1 (cont.): "dupreq_entry.fifo_q.tqe_prev" is accessed with lock "drc.mtx" held.
Also see events: [missing_lock][example_lock][example_lock][example_access][example_lock][example_access][example_lock][example_access][example_lock][example_access]
1332 		TAILQ_REMOVE(&drc->dupreq_q, dv, fifo_q);
(7) Event example_access: Example 2 (cont.): "dupreq_entry.fifo_q.tqe_prev" is accessed with lock "drc.mtx" held.
Also see events: [missing_lock][example_lock][example_access][example_lock][example_lock][example_access][example_lock][example_access][example_lock][example_access]
1333 		TAILQ_INIT_ENTRY(dv, fifo_q);
1334 		--(drc->size);
1335 		PTHREAD_MUTEX_unlock(&drc->mtx);
1336 	
1337 		t = rbtx_partition_of_scalar(&drc->xt, dv->hk);
1338 	
1339 		PTHREAD_MUTEX_lock(&t->mtx);
1340 		rbtree_x_cached_remove(&drc->xt, t, &dv->rbt_k, dv->hk);
1341 		PTHREAD_MUTEX_unlock(&t->mtx);
1342 	
1343 		/* we removed the dupreq from hashtable, release a ref */
1344 		dupreq_entry_put(dv);
1345 	
1346 	 out:
1347 		return status;
1348 	}
1349 	
1350 	/**
1351 	 * @brief Decrement the call path refcnt on a cache entry.
1352 	 *
1353 	 * We assert req->rq_u1 now points to the corresonding duplicate request
1354 	 * cache entry (dv).
1355 	 *
1356 	 * @param[in] req  The svc_req structure.
1357 	 * @param[in] func The function descriptor for this request type
1358 	 */
1359 	void nfs_dupreq_rele(struct svc_req *req, const nfs_function_desc_t *func)
1360 	{
1361 		dupreq_entry_t *dv = (dupreq_entry_t *) req->rq_u1;
1362 		drc_t *drc;
1363 	
1364 		/* no-cache cleanup */
1365 		if (dv == (void *)DUPREQ_NOCACHE) {
1366 			LogFullDebug(COMPONENT_DUPREQ, "releasing no-cache res %p",
1367 				     req->rq_u2);
1368 			func->free_function(req->rq_u2);
1369 			free_nfs_res(req->rq_u2);
1370 			goto out;
1371 		}
1372 	
1373 		drc = req->rq_xprt->xp_u2;
1374 		LogFullDebug(COMPONENT_DUPREQ,
1375 			     "releasing dv=%p xid=%" PRIu32
1376 			     " on DRC=%p state=%s, refcnt=%d",
1377 			     dv, dv->hin.tcp.rq_xid, drc,
1378 			     dupreq_state_table[dv->state], dv->refcnt);
1379 	
1380 		/* release req's hold on dupreq and drc */
1381 		dupreq_entry_put(dv);
1382 		nfs_dupreq_put_drc(drc, DRC_FLAG_NONE);
1383 	
1384 	 out:
1385 		/* dispose RPC header */
1386 		if (req->rq_auth)
1387 			SVCAUTH_RELEASE(req);
1388 	}
1389 	
1390 	/**
1391 	 * @brief Shutdown the dupreq2 package.
1392 	 */
1393 	void dupreq2_pkgshutdown(void)
1394 	{
1395 		/* XXX do nothing */
1396 	}
1397