1    	/*
2    	 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3    	 *
4    	 * Copyright CEA/DAM/DIF  (2011)
5    	 * contributeur : Philippe DENIEL   philippe.deniel@cea.fr
6    	 *                Thomas LEIBOVICI  thomas.leibovici@cea.fr
7    	 *
8    	 *
9    	 * This program is free software; you can redistribute it and/or
10   	 * modify it under the terms of the GNU Lesser General Public
11   	 * License as published by the Free Software Foundation; either
12   	 * version 3 of the License, or (at your option) any later version.
13   	 *
14   	 * This program is distributed in the hope that it will be useful,
15   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17   	 * Lesser General Public License for more details.
18   	 *
19   	 * You should have received a copy of the GNU Lesser General Public
20   	 * License along with this library; if not, write to the Free Software
21   	 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22   	 * 02110-1301 USA
23   	 *
24   	 * ---------------------------------------
25   	 */
26   	
27   	/**
28   	 * \file    9p_dispatcher.c
29   	 * \date    $Date: 2006/02/23 12:33:05 $
30   	 * \brief   9P protocol dispatch thread.
31   	 *
32   	 * The file that contains the '_9p_dispatcher_thread' routine for ganesha
33   	 * (and all the related stuff).
34   	 *
35   	 */
36   	#include "config.h"
37   	#include <stdio.h>
38   	#include <string.h>
39   	#include <pthread.h>
40   	#include <fcntl.h>
41   	#include <sys/file.h>		/* for having FNDELAY */
42   	#include <sys/select.h>
43   	#include <sys/types.h>
44   	#include <sys/socket.h>
45   	#include <netinet/in.h>
46   	#include <netinet/tcp.h>
47   	#include <poll.h>
48   	#include <arpa/inet.h>		/* For inet_ntop() */
49   	#include "hashtable.h"
50   	#include "log.h"
51   	#include "abstract_mem.h"
52   	#include "abstract_atomic.h"
53   	#include "nfs_init.h"
54   	#include "nfs_core.h"
55   	#include "nfs_exports.h"
56   	#include "nfs_proto_functions.h"
57   	#include "nfs_file_handle.h"
58   	#include "9p_req_queue.h"
59   	#include "client_mgr.h"
60   	#include "server_stats.h"
61   	#include "9p.h"
62   	#include <stdbool.h>
63   	#include <urcu-bp.h>
64   	
65   	#define P_FAMILY AF_INET6
66   	
67   	static struct fridgethr *_9p_worker_fridge;
68   	
69   	static struct _9p_req_st _9p_req_st;	/*< 9P request queues */
70   	
71   	static const char *req_q_s[N_REQ_QUEUES] = {
72   		"REQ_Q_LOW_LATENCY",
73   	};
74   	
75   	/* static */
76   	uint32_t _9p_outstanding_reqs_est(void)
77   	{
78   		static uint32_t ctr;
79   		static uint32_t nreqs;
80   		struct req_q_pair *qpair;
81   		uint32_t treqs;
82   		int ix;
83   	
84   		if ((atomic_inc_uint32_t(&ctr) % 10) != 0)
85   			return atomic_fetch_uint32_t(&nreqs);
86   	
87   		treqs = 0;
88   		for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
89   			qpair = &(_9p_req_st.reqs._9p_request_q.qset[ix]);
90   			treqs += atomic_fetch_uint32_t(&qpair->producer.size);
91   			treqs += atomic_fetch_uint32_t(&qpair->consumer.size);
92   		}
93   	
94   		atomic_store_uint32_t(&nreqs, treqs);
95   		return treqs;
96   	}
97   	
98   	static inline struct _9p_request_data *_9p_consume_req(struct req_q_pair *qpair)
99   	{
(1) Event assign_zero: Assigning: "reqdata" = "NULL".
Also see events: [var_deref_model]
100  		struct _9p_request_data *reqdata = NULL;
101  	
102  		pthread_spin_lock(&qpair->consumer.sp);
(2) Event cond_false: Condition "qpair->consumer.size > 0", taking false branch.
103  		if (qpair->consumer.size > 0) {
104  			reqdata =
105  			    glist_first_entry(&qpair->consumer.q,
106  					      struct _9p_request_data,
107  					      req_q);
108  			glist_del(&reqdata->req_q);
109  			--(qpair->consumer.size);
110  			pthread_spin_unlock(&qpair->consumer.sp);
111  			goto out;
(3) Event else_branch: Reached else branch.
112  		} else {
113  			char *s = NULL;
114  			uint32_t csize = ~0U;
115  			uint32_t psize = ~0U;
116  	
117  			pthread_spin_lock(&qpair->producer.sp);
(4) Event cond_true: Condition "!!(component_log_level[COMPONENT_DISPATCH] >= NIV_FULL_DEBUG)", taking true branch.
(5) Event cond_true: Condition "!!(component_log_level[COMPONENT_DISPATCH] >= NIV_FULL_DEBUG)", taking true branch.
118  			if (isFullDebug(COMPONENT_DISPATCH)) {
119  				s = (char *)qpair->s;
120  				csize = qpair->consumer.size;
121  				psize = qpair->producer.size;
122  			}
(6) Event cond_true: Condition "qpair->producer.size > 0", taking true branch.
123  			if (qpair->producer.size > 0) {
124  				/* splice */
125  				glist_splice_tail(&qpair->consumer.q,
126  						  &qpair->producer.q);
127  				qpair->consumer.size = qpair->producer.size;
128  				qpair->producer.size = 0;
129  				/* consumer.size > 0 */
130  				pthread_spin_unlock(&qpair->producer.sp);
(7) Event cond_false: Condition "qpair->consumer.q.next != &qpair->consumer.q", taking false branch.
131  				reqdata =
132  				    glist_first_entry(&qpair->consumer.q,
133  						      struct _9p_request_data,
134  						      req_q);
(8) Event var_deref_model: Passing null pointer "&reqdata->req_q" to "glist_del", which dereferences it. [details]
Also see events: [assign_zero]
135  				glist_del(&reqdata->req_q);
136  				--(qpair->consumer.size);
137  				pthread_spin_unlock(&qpair->consumer.sp);
138  				if (s)
139  					LogFullDebug(COMPONENT_DISPATCH,
140  						     "try splice, qpair %s consumer qsize=%u producer qsize=%u",
141  						     s, csize, psize);
142  				goto out;
143  			}
144  	
145  			pthread_spin_unlock(&qpair->producer.sp);
146  			pthread_spin_unlock(&qpair->consumer.sp);
147  	
148  			if (s)
149  				LogFullDebug(COMPONENT_DISPATCH,
150  					     "try splice, qpair %s consumer qsize=%u producer qsize=%u",
151  					     s, csize, psize);
152  		}
153  	 out:
154  		return reqdata;
155  	}
156  	
157  	static struct _9p_request_data *_9p_dequeue_req(struct _9p_worker_data *worker)
158  	{
159  		struct _9p_request_data *reqdata = NULL;
160  		struct req_q_set *_9p_request_q = &_9p_req_st.reqs._9p_request_q;
161  		struct req_q_pair *qpair;
162  		uint32_t ix, slot;
163  		struct timespec timeout;
164  	
165  		/* XXX: the following stands in for a more robust/flexible
166  		 * weighting function */
167  	
168  	 retry_deq:
169  		slot = atomic_inc_uint32_t(&_9p_req_st.reqs.ctr) % N_REQ_QUEUES;
170  		for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
171  			qpair = &(_9p_request_q->qset[slot]);
172  	
173  			LogFullDebug(COMPONENT_DISPATCH,
174  				     "dequeue_req try qpair %s %p:%p", qpair->s,
175  				     &qpair->producer, &qpair->consumer);
176  	
177  			/* anything? */
178  			reqdata = _9p_consume_req(qpair);
179  			if (reqdata) {
180  				break;
181  			}
182  	
183  			++slot;
184  			slot = slot % N_REQ_QUEUES;
185  	
186  		}			/* for */
187  	
188  		/* wait */
189  		if (!reqdata) {
190  			struct fridgethr_context *ctx =
191  				container_of(worker, struct fridgethr_context, wd);
192  			wait_q_entry_t *wqe = &worker->wqe;
193  	
194  			assert(wqe->waiters == 0); /* wqe is not on any wait queue */
195  			PTHREAD_MUTEX_lock(&wqe->lwe.mtx);
196  			wqe->flags = Wqe_LFlag_WaitSync;
197  			wqe->waiters = 1;
198  			/* XXX functionalize */
199  			pthread_spin_lock(&_9p_req_st.reqs.sp);
200  			glist_add_tail(&_9p_req_st.reqs.wait_list, &wqe->waitq);
201  			++(_9p_req_st.reqs.waiters);
202  			pthread_spin_unlock(&_9p_req_st.reqs.sp);
203  			while (!(wqe->flags & Wqe_LFlag_SyncDone)) {
204  				timeout.tv_sec = time(NULL) + 5;
205  				timeout.tv_nsec = 0;
206  				pthread_cond_timedwait(&wqe->lwe.cv, &wqe->lwe.mtx,
207  						       &timeout);
208  				if (fridgethr_you_should_break(ctx)) {
209  					/* We are returning;
210  					 * so take us out of the waitq */
211  					pthread_spin_lock(&_9p_req_st.reqs.sp);
212  					if (wqe->waitq.next != NULL
213  					    || wqe->waitq.prev != NULL) {
214  						/* Element is still in wqitq,
215  						 * remove it */
216  						glist_del(&wqe->waitq);
217  						--(_9p_req_st.reqs.waiters);
218  						--(wqe->waiters);
219  						wqe->flags &=
220  						    ~(Wqe_LFlag_WaitSync |
221  						      Wqe_LFlag_SyncDone);
222  					}
223  					pthread_spin_unlock(&_9p_req_st.reqs.sp);
224  					PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
225  					return NULL;
226  				}
227  			}
228  	
229  			/* XXX wqe was removed from _9p_req_st.waitq
230  			 * (by signalling thread) */
231  			wqe->flags &= ~(Wqe_LFlag_WaitSync | Wqe_LFlag_SyncDone);
232  			PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
233  			LogFullDebug(COMPONENT_DISPATCH, "wqe wakeup %p", wqe);
234  			goto retry_deq;
235  		} /* !reqdata */
236  	
237  		return reqdata;
238  	}
239  	
240  	static void _9p_enqueue_req(struct _9p_request_data *reqdata)
241  	{
242  		struct req_q_set *_9p_request_q;
243  		struct req_q_pair *qpair;
244  		struct req_q *q;
245  	
246  		_9p_request_q = &_9p_req_st.reqs._9p_request_q;
247  	
248  		qpair = &(_9p_request_q->qset[REQ_Q_LOW_LATENCY]);
249  	
250  		/* always append to producer queue */
251  		q = &qpair->producer;
252  		pthread_spin_lock(&q->sp);
253  		glist_add_tail(&q->q, &reqdata->req_q);
254  		++(q->size);
255  		pthread_spin_unlock(&q->sp);
256  	
257  		LogDebug(COMPONENT_DISPATCH,
258  			 "enqueued req, q %p (%s %p:%p) size is %d (enq %"
259  			 PRIu64 " deq %" PRIu64 ")",
260  			 q, qpair->s, &qpair->producer, &qpair->consumer, q->size,
261  			 nfs_health_.enqueued_reqs, nfs_health_.dequeued_reqs);
262  	
263  		/* potentially wakeup some thread */
264  	
265  		/* global waitq */
266  		{
267  			wait_q_entry_t *wqe;
268  	
269  			/* SPIN LOCKED */
270  			pthread_spin_lock(&_9p_req_st.reqs.sp);
271  			if (_9p_req_st.reqs.waiters) {
272  				wqe = glist_first_entry(&_9p_req_st.reqs.wait_list,
273  							wait_q_entry_t, waitq);
274  	
275  				LogFullDebug(COMPONENT_DISPATCH,
276  					     "_9p_req_st.reqs.waiters %u signal wqe %p (for q %p)",
277  					     _9p_req_st.reqs.waiters, wqe, q);
278  	
279  				/* release 1 waiter */
280  				glist_del(&wqe->waitq);
281  				--(_9p_req_st.reqs.waiters);
282  				--(wqe->waiters);
283  				/* ! SPIN LOCKED */
284  				pthread_spin_unlock(&_9p_req_st.reqs.sp);
285  				PTHREAD_MUTEX_lock(&wqe->lwe.mtx);
286  				/* XXX reliable handoff */
287  				wqe->flags |= Wqe_LFlag_SyncDone;
288  				if (wqe->flags & Wqe_LFlag_WaitSync)
289  					pthread_cond_signal(&wqe->lwe.cv);
290  				PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
291  			} else
292  				/* ! SPIN LOCKED */
293  				pthread_spin_unlock(&_9p_req_st.reqs.sp);
294  		}
295  	
296  		return;
297  	}
298  	
299  	/**
300  	 * @brief Execute a 9p request
301  	 *
302  	 * @param[in,out] req9p       9p request
303  	 */
304  	static void _9p_execute(struct _9p_request_data *req9p)
305  	{
306  		struct req_op_context req_ctx;
307  		struct export_perms export_perms;
308  	
309  		memset(&req_ctx, 0, sizeof(struct req_op_context));
310  		memset(&export_perms, 0, sizeof(struct export_perms));
311  		op_ctx = &req_ctx;
312  		op_ctx->caller_addr = (sockaddr_t *)&req9p->pconn->addrpeer;
313  		op_ctx->req_type = _9P_REQUEST;
314  		op_ctx->export_perms = &export_perms;
315  	
316  		if (req9p->pconn->trans_type == _9P_TCP)
317  			_9p_tcp_process_request(req9p);
318  	#ifdef _USE_9P_RDMA
319  		else if (req9p->pconn->trans_type == _9P_RDMA)
320  			_9p_rdma_process_request(req9p);
321  	#endif
322  		op_ctx = NULL;
323  	}				/* _9p_execute */
324  	
325  	/**
326  	 * @brief Free resources allocated for a 9p request
327  	 *
328  	 * This does not free the request itself.
329  	 *
330  	 * @param[in] req9p 9p request
331  	 */
332  	static void _9p_free_reqdata(struct _9p_request_data *req9p)
333  	{
334  		if (req9p->pconn->trans_type == _9P_TCP)
335  			gsh_free(req9p->_9pmsg);
336  	
337  		/* decrease connection refcount */
338  		(void) atomic_dec_uint32_t(&req9p->pconn->refcount);
339  	}
340  	
341  	static uint32_t worker_indexer;
342  	
343  	/**
344  	 * @brief Initialize a worker thread
345  	 *
346  	 * @param[in] ctx Thread fridge context
347  	 */
348  	
349  	static void worker_thread_initializer(struct fridgethr_context *ctx)
350  	{
351  		struct _9p_worker_data *wd = &ctx->wd;
352  		char thr_name[32];
353  	
354  		wd->worker_index = atomic_inc_uint32_t(&worker_indexer);
355  		snprintf(thr_name, sizeof(thr_name), "work-%u", wd->worker_index);
356  		SetNameFunction(thr_name);
357  	
358  		/* Initialize thr waitq */
359  		init_wait_q_entry(&wd->wqe);
360  	}
361  	
362  	/**
363  	 * @brief Finalize a worker thread
364  	 *
365  	 * @param[in] ctx Thread fridge context
366  	 */
367  	
368  	static void worker_thread_finalizer(struct fridgethr_context *ctx)
369  	{
370  		ctx->thread_info = NULL;
371  	}
372  	
373  	/**
374  	 * @brief The main function for a worker thread
375  	 *
376  	 * This is the body of the worker thread. Its starting arguments are
377  	 * located in global array worker_data. The argument is no pointer but
378  	 * the worker's index.  It then uses this index to address its own
379  	 * worker data in the array.
380  	 *
381  	 * @param[in] ctx Fridge thread context
382  	 */
383  	
384  	static void _9p_worker_run(struct fridgethr_context *ctx)
385  	{
386  		struct _9p_worker_data *worker_data = &ctx->wd;
387  		struct _9p_request_data *reqdata;
388  		pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
389  		pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
390  	
391  		/* Worker's loop */
392  		while (!fridgethr_you_should_break(ctx)) {
393  			reqdata = _9p_dequeue_req(worker_data);
394  	
395  			if (!reqdata)
396  				continue;
397  	
398  			reqdata->mutex = &mutex;
399  			reqdata->cond = &cond;
400  	
401  			_9p_execute(reqdata);
402  			_9p_free_reqdata(reqdata);
403  	
404  			/* Free the req by releasing the entry */
405  			LogFullDebug(COMPONENT_DISPATCH,
406  				     "Invalidating processed entry");
407  	
408  			gsh_free(reqdata);
409  			(void) atomic_inc_uint64_t(&nfs_health_.dequeued_reqs);
410  		}
411  	
412  		PTHREAD_MUTEX_destroy(&mutex);
413  		PTHREAD_COND_destroy(&cond);
414  	}
415  	
416  	int _9p_worker_init(void)
417  	{
418  		struct fridgethr_params frp;
419  		struct req_q_pair *qpair;
420  		int ix;
421  		int rc = 0;
422  	
423  		/* Init request queue before workers */
424  		pthread_spin_init(&_9p_req_st.reqs.sp, PTHREAD_PROCESS_PRIVATE);
425  		_9p_req_st.reqs.size = 0;
426  		for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
427  			qpair = &(_9p_req_st.reqs._9p_request_q.qset[ix]);
428  			qpair->s = req_q_s[ix];
429  			_9p_rpc_q_init(&qpair->producer);
430  			_9p_rpc_q_init(&qpair->consumer);
431  		}
432  	
433  		/* waitq */
434  		glist_init(&_9p_req_st.reqs.wait_list);
435  		_9p_req_st.reqs.waiters = 0;
436  	
437  		memset(&frp, 0, sizeof(struct fridgethr_params));
438  		frp.thr_max = _9p_param.nb_worker;
439  		frp.thr_min = _9p_param.nb_worker;
440  		frp.flavor = fridgethr_flavor_looper;
441  		frp.thread_initialize = worker_thread_initializer;
442  		frp.thread_finalize = worker_thread_finalizer;
443  		frp.wake_threads = _9p_queue_awaken;
444  		frp.wake_threads_arg = &_9p_req_st;
445  	
446  		rc = fridgethr_init(&_9p_worker_fridge, "9P", &frp);
447  		if (rc != 0) {
448  			LogMajor(COMPONENT_DISPATCH,
449  				 "Unable to initialize worker fridge: %d", rc);
450  			return rc;
451  		}
452  	
453  		rc = fridgethr_populate(_9p_worker_fridge, _9p_worker_run, NULL);
454  	
455  		if (rc != 0) {
456  			LogMajor(COMPONENT_DISPATCH,
457  				 "Unable to populate worker fridge: %d", rc);
458  		}
459  	
460  		return rc;
461  	}
462  	
463  	int _9p_worker_shutdown(void)
464  	{
465  		int rc;
466  	
467  		if (!_9p_worker_fridge)
468  			return 0;
469  	
470  		rc = fridgethr_sync_command(_9p_worker_fridge, fridgethr_comm_stop,
471  					    120);
472  	
473  		if (rc == ETIMEDOUT) {
474  			LogMajor(COMPONENT_DISPATCH,
475  				 "Shutdown timed out, cancelling threads.");
476  			fridgethr_cancel(_9p_worker_fridge);
477  		} else if (rc != 0) {
478  			LogMajor(COMPONENT_DISPATCH,
479  				 "Failed shutting down worker threads: %d", rc);
480  		}
481  		return rc;
482  	}
483  	
484  	void DispatchWork9P(struct _9p_request_data *req)
485  	{
486  		switch (req->pconn->trans_type) {
487  		case _9P_TCP:
488  			LogDebug(COMPONENT_DISPATCH,
489  				 "Dispatching 9P/TCP request %p, tcpsock=%lu",
490  				 req, req->pconn->trans_data.sockfd);
491  			break;
492  	
493  		case _9P_RDMA:
494  			LogDebug(COMPONENT_DISPATCH,
495  				 "Dispatching 9P/RDMA request %p", req);
496  			break;
497  	
498  		default:
499  			LogCrit(COMPONENT_DISPATCH,
500  				"/!\\ Implementation error, bad 9P transport type");
501  			return;
502  		}
503  	
504  		/* increase connection refcount */
505  		(void) atomic_inc_uint32_t(&req->pconn->refcount);
506  	
507  		/* new-style dispatch */
508  		_9p_enqueue_req(req);
509  	}
510  	
511  	/**
512  	 * _9p_socket_thread: 9p socket manager.
513  	 *
514  	 * This function is the main loop for the 9p socket manager.
515  	 * One such thread exists per connection.
516  	 *
517  	 * @param Arg the socket number cast as a void * in pthread_create
518  	 *
519  	 * @return NULL
520  	 *
521  	 */
522  	
523  	void *_9p_socket_thread(void *Arg)
524  	{
525  		long int tcp_sock = (long int)Arg;
526  		int rc = -1;
527  		struct pollfd fds[1];
528  		int fdcount = 1;
529  		static char my_name[MAXNAMLEN + 1];
530  		char strcaller[INET6_ADDRSTRLEN];
531  		struct _9p_request_data *req = NULL;
532  		int tag;
533  		unsigned long sequence = 0;
534  		unsigned int i = 0;
535  		char *_9pmsg = NULL;
536  		uint32_t msglen;
537  	
538  		struct _9p_conn _9p_conn;
539  		socklen_t addrpeerlen;
540  	
541  		int readlen = 0;
542  		int total_readlen = 0;
543  	
544  		snprintf(my_name, MAXNAMLEN, "9p_sock_mgr#fd=%ld", tcp_sock);
545  		SetNameFunction(my_name);
546  		rcu_register_thread();
547  	
548  		/* Init the struct _9p_conn structure */
549  		memset(&_9p_conn, 0, sizeof(_9p_conn));
550  		PTHREAD_MUTEX_init(&_9p_conn.sock_lock, NULL);
551  		_9p_conn.trans_type = _9P_TCP;
552  		_9p_conn.trans_data.sockfd = tcp_sock;
553  		for (i = 0; i < FLUSH_BUCKETS; i++) {
554  			PTHREAD_MUTEX_init(&_9p_conn.flush_buckets[i].lock, NULL);
555  			glist_init(&_9p_conn.flush_buckets[i].list);
556  		}
557  		atomic_store_uint32_t(&_9p_conn.refcount, 0);
558  	
559  		/* Init the fids pointers array */
560  		memset(&_9p_conn.fids, 0, _9P_FID_PER_CONN * sizeof(struct _9p_fid *));
561  	
562  		/* Set initial msize.
563  		 * Client may request a lower value during TVERSION */
564  		_9p_conn.msize = _9p_param._9p_tcp_msize;
565  	
566  		if (gettimeofday(&_9p_conn.birth, NULL) == -1)
567  			LogFatal(COMPONENT_9P, "Cannot get connection's time of birth");
568  	
569  		addrpeerlen = sizeof(_9p_conn.addrpeer);
570  		rc = getpeername(tcp_sock, (struct sockaddr *)&_9p_conn.addrpeer,
571  				 &addrpeerlen);
572  		if (rc == -1) {
573  			LogMajor(COMPONENT_9P,
574  				 "Cannot get peername to tcp socket for 9p, error %d (%s)",
575  				 errno, strerror(errno));
576  			/* XXX */
577  			strlcpy(strcaller, "(unresolved)", INET6_ADDRSTRLEN);
578  			goto end;
579  		} else {
580  			switch (_9p_conn.addrpeer.ss_family) {
581  			case AF_INET:
582  				inet_ntop(_9p_conn.addrpeer.ss_family,
583  					  &((struct sockaddr_in *)&_9p_conn.addrpeer)->
584  					  sin_addr, strcaller, INET6_ADDRSTRLEN);
585  				break;
586  			case AF_INET6:
587  				inet_ntop(_9p_conn.addrpeer.ss_family,
588  					  &((struct sockaddr_in6 *)&_9p_conn.addrpeer)->
589  					  sin6_addr, strcaller, INET6_ADDRSTRLEN);
590  				break;
591  			default:
592  				snprintf(strcaller, INET6_ADDRSTRLEN, "BAD ADDRESS");
593  				break;
594  			}
595  	
596  			LogEvent(COMPONENT_9P, "9p socket #%ld is connected to %s",
597  				 tcp_sock, strcaller);
598  		}
599  		_9p_conn.client = get_gsh_client(&_9p_conn.addrpeer, false);
600  	
601  		/* Set up the structure used by poll */
602  		memset((char *)fds, 0, sizeof(struct pollfd));
603  		fds[0].fd = tcp_sock;
604  		fds[0].events =
605  		    POLLIN | POLLPRI | POLLRDBAND | POLLRDNORM | POLLRDHUP | POLLHUP |
606  		    POLLERR | POLLNVAL;
607  	
608  		for (;;) {
609  			total_readlen = 0;  /* new message */
610  			rc = poll(fds, fdcount, -1);
611  			if (rc == -1) {
612  				/* timeout = -1 => Wait indefinitely for events */
613  				/* Interruption if not an issue */
614  				if (errno == EINTR)
615  					continue;
616  	
617  				LogCrit(COMPONENT_9P,
618  					"Got error %u (%s) on fd %ld connect to %s while polling on socket",
619  					errno, strerror(errno), tcp_sock, strcaller);
620  			}
621  	
622  			if (fds[0].revents & POLLNVAL) {
623  				LogEvent(COMPONENT_9P,
624  					 "Client %s on socket %lu produced POLLNVAL",
625  					 strcaller, tcp_sock);
626  				goto end;
627  			}
628  	
629  			if (fds[0].revents & (POLLERR | POLLHUP | POLLRDHUP)) {
630  				LogEvent(COMPONENT_9P,
631  					 "Client %s on socket %lu has shut down and closed",
632  					 strcaller, tcp_sock);
633  				goto end;
634  			}
635  	
636  			if (!(fds[0].revents & (POLLIN | POLLRDNORM)))
637  				continue;
638  	
639  			/* Prepare to read the message */
640  			_9pmsg = gsh_malloc(_9p_conn.msize);
641  	
642  			/* An incoming 9P request: the msg has a 4 bytes header
643  			   showing the size of the msg including the header */
644  			readlen = recv(fds[0].fd, _9pmsg,
645  				       _9P_HDR_SIZE, MSG_WAITALL);
646  			if (readlen != _9P_HDR_SIZE)
647  				goto badmsg;
648  	
649  			msglen = *(uint32_t *) _9pmsg;
650  			if (msglen > _9p_conn.msize) {
651  				LogCrit(COMPONENT_9P,
652  					"Message size too big! got %u, max = %u",
653  					msglen, _9p_conn.msize);
654  				goto end;
655  			}
656  	
657  			LogFullDebug(COMPONENT_9P,
658  				     "Received 9P/TCP message of size %u from client %s on socket %lu",
659  				     msglen, strcaller, tcp_sock);
660  	
661  			total_readlen += readlen;
662  			while (total_readlen < msglen) {
663  				readlen = recv(fds[0].fd,
664  					       _9pmsg + total_readlen,
665  					       msglen - total_readlen,
666  					       0);
667  	
668  				if (readlen > 0) {
669  					total_readlen += readlen;
670  					continue;
671  				}
672  				if (readlen == 0 ||
673  				    (readlen < 0 && errno != EINTR))
674  					goto badmsg;
675  			}	/* while */
676  	
677  			server_stats_transport_done(_9p_conn.client,
678  						    total_readlen, 1, 0,
679  						    0, 0, 0);
680  	
681  			/* Message is good. */
682  			(void) atomic_inc_uint64_t(&nfs_health_.enqueued_reqs);
683  			req = gsh_calloc(1, sizeof(struct _9p_request_data));
684  	
685  			req->_9pmsg = _9pmsg;
686  			req->pconn = &_9p_conn;
687  	
688  			/* Add this request to the request list,
689  			 * should it be flushed later. */
690  			tag = *(u16 *) (_9pmsg + _9P_HDR_SIZE + _9P_TYPE_SIZE);
691  			_9p_AddFlushHook(req, tag, sequence++);
692  			LogFullDebug(COMPONENT_9P,
693  				     "Request tag is %d\n", tag);
694  	
695  			/* Message was OK push it */
696  			DispatchWork9P(req);
697  	
698  			/* Not our buffer anymore */
699  			_9pmsg = NULL;
700  			continue;
701  	
702  	badmsg:
703  			if (readlen == 0)
704  				LogEvent(COMPONENT_9P,
705  					 "Premature end for Client %s on socket %lu, total read = %u",
706  					 strcaller, tcp_sock, total_readlen);
707  			else if (readlen < 0) {
708  				LogEvent(COMPONENT_9P,
709  					 "Read error client %s on socket %lu errno=%d, total read = %u",
710  					 strcaller, tcp_sock,
711  					 errno, total_readlen);
712  			} else
713  				LogEvent(COMPONENT_9P,
714  					 "Header too small! for client %s on socket %lu: readlen=%u expected=%u",
715  					 strcaller, tcp_sock, readlen,
716  					 _9P_HDR_SIZE);
717  	
718  			/* Either way, we close the connection.
719  			 * It is not possible to survive
720  			 * once we get out of sync in the TCP stream
721  			 * with the client
722  			 */
723  			break; /* bail out */
724  		}			/* for( ;; ) */
725  	
726  	end:
727  		LogEvent(COMPONENT_9P, "Closing connection on socket %lu", tcp_sock);
728  		close(tcp_sock);
729  	
730  		/* Free buffer if we encountered an error
731  		 * before we could give it to a worker */
732  		if (_9pmsg)
733  			gsh_free(_9pmsg);
734  	
735  		while (atomic_fetch_uint32_t(&_9p_conn.refcount)) {
736  			LogEvent(COMPONENT_9P, "Waiting for workers to release pconn");
737  			sleep(1);
738  		}
739  	
740  		_9p_cleanup_fids(&_9p_conn);
741  	
742  		if (_9p_conn.client != NULL)
743  			put_gsh_client(_9p_conn.client);
744  	
745  		rcu_unregister_thread();
746  		pthread_exit(NULL);
747  	}				/* _9p_socket_thread */
748  	
749  	/**
750  	 * _9p_create_socket_V4 : create the socket and bind for 9P using
751  	 * the available V4 interfaces on the host. This is not the default
752  	 * for ganesha. We expect _9p_create_socket_V6 to be called first
753  	 * and succeed. Only when the V6 function returns failure is this
754  	 * function expected to be called. See _9p_create_socket().
755  	 *
756  	 * @return socket fd or -1 in case of failure
757  	 *
758  	 */
759  	static int _9p_create_socket_V4(void)
760  	{
761  		int			sock = -1;
762  		int			one = 1;
763  		int			centvingt = 120;
764  		int			neuf = 9;
765  		struct	netbuf		netbuf_tcp;
766  		struct	t_bind		bindaddr_tcp;
767  		struct	__rpc_sockinfo	si_tcp;
768  		struct	sockaddr_in	sinaddr_tcp;
769  	
770  		sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
771  		if (sock == -1) {
772  			LogWarn(COMPONENT_9P_DISPATCH,
773  				"Error creating 9p V4 socket, error %d(%s)",
774  				errno, strerror(errno));
775  			return -1;
776  		}
777  	
778  		if ((setsockopt(sock,
779  				SOL_SOCKET, SO_REUSEADDR,
780  				&one, sizeof(one)) == -1) ||
781  		    (setsockopt(sock,
782  				IPPROTO_TCP, TCP_NODELAY,
783  				&one, sizeof(one)) == -1) ||
784  		    (setsockopt(sock,
785  				IPPROTO_TCP, TCP_KEEPIDLE,
786  				&centvingt, sizeof(centvingt)) == -1) ||
787  		    (setsockopt(sock,
788  				IPPROTO_TCP, TCP_KEEPINTVL,
789  				&centvingt, sizeof(centvingt)) == -1) ||
790  		    (setsockopt(sock,
791  				IPPROTO_TCP, TCP_KEEPCNT,
792  				&neuf, sizeof(neuf)) == -1)) {
793  			LogWarn(COMPONENT_9P_DISPATCH,
794  				"Error setting 9p V4 socket option, error %d(%s)",
795  				errno, strerror(errno));
796  			goto err;
797  		}
798  	
799  		memset(&sinaddr_tcp, 0, sizeof(sinaddr_tcp));
800  		sinaddr_tcp.sin_family = AF_INET;
801  	
802  		/* All the interfaces on the machine are used */
803  		sinaddr_tcp.sin_addr.s_addr = htonl(INADDR_ANY);
804  		sinaddr_tcp.sin_port = htons(_9p_param._9p_tcp_port);
805  	
806  		netbuf_tcp.maxlen = sizeof(sinaddr_tcp);
807  		netbuf_tcp.len = sizeof(sinaddr_tcp);
808  		netbuf_tcp.buf = &sinaddr_tcp;
809  	
810  		bindaddr_tcp.qlen = SOMAXCONN;
811  		bindaddr_tcp.addr = netbuf_tcp;
812  	
813  		if (!__rpc_fd2sockinfo(sock, &si_tcp)) {
814  			LogWarn(COMPONENT_9P_DISPATCH,
815  				"Cannot get 9p socket info for tcp V4 socket error %d(%s)",
816  				errno, strerror(errno));
817  			goto err;
818  		}
819  	
820  		if (bind(sock,
821  			 (struct sockaddr *)bindaddr_tcp.addr.buf,
822  			 (socklen_t) si_tcp.si_alen) == -1) {
823  			LogWarn(COMPONENT_9P_DISPATCH,
824  				"Cannot bind 9p tcp V4 socket, error %d(%s)", errno,
825  				strerror(errno));
826  			goto err;
827  		}
828  	
829  		if (listen(sock, 20) == -1) {
830  			LogWarn(COMPONENT_9P_DISPATCH,
831  				"Cannot bind 9p tcp V4 socket, error %d(%s)", errno,
832  				strerror(errno));
833  			goto err;
834  		}
835  	
836  		return sock;
837  	
838  	err:
839  	
840  		close(sock);
841  		return -1;
842  	}
843  	
844  	/**
845  	 * _9p_create_socket_V6 : create the socket and bind for 9P using
846  	 * the available V6 interfaces on the host
847  	 *
848  	 * @return socket fd or -1 in case of failure
849  	 *
850  	 */
851  	static int _9p_create_socket_V6(void)
852  	{
853  		int sock = -1;
854  		int one	= 1;
855  		int centvingt = 120;
856  		int neuf = 9;
857  		struct sockaddr_in6 sinaddr_tcp6;
858  		struct netbuf netbuf_tcp6;
859  		struct t_bind bindaddr_tcp6;
860  		struct __rpc_sockinfo si_tcp6;
861  	
862  		sock = socket(P_FAMILY, SOCK_STREAM, IPPROTO_TCP);
863  		if (sock == -1) {
864  			if (errno == EAFNOSUPPORT) {
865  				LogWarn(COMPONENT_9P_DISPATCH,
866  					"Error creating socket, V6 intfs disabled? error %d(%s)",
867  					errno, strerror(errno));
868  				return _9p_create_socket_V4();
869  			}
870  	
871  			return -1;
872  		}
873  	
874  		if ((setsockopt(sock,
875  				SOL_SOCKET, SO_REUSEADDR,
876  				&one, sizeof(one)) == -1) ||
877  		    (setsockopt(sock,
878  				IPPROTO_TCP, TCP_NODELAY,
879  				&one, sizeof(one)) == -1) ||
880  		    (setsockopt(sock,
881  				IPPROTO_TCP, TCP_KEEPIDLE,
882  				&centvingt, sizeof(centvingt)) == -1) ||
883  		    (setsockopt(sock,
884  				IPPROTO_TCP, TCP_KEEPINTVL,
885  				&centvingt, sizeof(centvingt)) == -1) ||
886  		    (setsockopt(sock,
887  				IPPROTO_TCP, TCP_KEEPCNT,
888  				&neuf, sizeof(neuf)) == -1)) {
889  			LogWarn(COMPONENT_9P_DISPATCH,
890  				"Error setting V6 socket option, error %d(%s)",
891  				errno, strerror(errno));
892  			goto err;
893  		}
894  	
895  		memset(&sinaddr_tcp6, 0, sizeof(sinaddr_tcp6));
896  		sinaddr_tcp6.sin6_family = AF_INET6;
897  		/* All the interfaces on the machine are used */
898  		sinaddr_tcp6.sin6_addr = in6addr_any;
899  		sinaddr_tcp6.sin6_port = htons(_9p_param._9p_tcp_port);
900  	
901  		netbuf_tcp6.maxlen = sizeof(sinaddr_tcp6);
902  		netbuf_tcp6.len = sizeof(sinaddr_tcp6);
903  		netbuf_tcp6.buf = &sinaddr_tcp6;
904  	
905  		bindaddr_tcp6.qlen = SOMAXCONN;
906  		bindaddr_tcp6.addr = netbuf_tcp6;
907  	
908  		if (!__rpc_fd2sockinfo(sock, &si_tcp6)) {
909  			LogWarn(COMPONENT_9P_DISPATCH,
910  				"Cannot get 9p socket info for tcp6 socket error %d(%s)",
911  				errno, strerror(errno));
912  			goto err;
913  		}
914  	
915  		if (bind(sock,
916  			 (struct sockaddr *)bindaddr_tcp6.addr.buf,
917  			 (socklen_t) si_tcp6.si_alen) == -1) {
918  			LogWarn(COMPONENT_9P_DISPATCH,
919  				"Cannot bind 9p tcp6 socket, error %d (%s)", errno,
920  				strerror(errno));
921  			goto err;
922  		}
923  	
924  		if (listen(sock, 20) == -1) {
925  			LogWarn(COMPONENT_9P_DISPATCH,
926  				"Cannot bind 9p tcp6 socket, error %d (%s)", errno,
927  				strerror(errno));
928  			goto err;
929  		}
930  	
931  		return sock;
932  	
933  	err:
934  	
935  		close(sock);
936  		return -1;
937  	}
938  	
939  	/**
940  	 * _9p_dispatcher_thread: thread used for RPC dispatching.
941  	 *
942  	 * This function is the main loop for the 9p dispatcher.
943  	 * It never returns because it is an infinite loop.
944  	 *
945  	 * @param Arg (unused)
946  	 *
947  	 * @return Pointer to the result (but this function will mostly loop forever).
948  	 *
949  	 */
950  	void *_9p_dispatcher_thread(void *Arg)
951  	{
952  		int _9p_socket;
953  		int rc = 0;
954  		long int newsock = -1;
955  		pthread_attr_t attr_thr;
956  		pthread_t tcp_thrid;
957  	
958  		SetNameFunction("_9p_disp");
959  	
960  		rcu_register_thread();
961  	
962  		/* Calling dispatcher main loop */
963  		LogInfo(COMPONENT_9P_DISPATCH, "Entering 9P dispatcher");
964  	
965  		LogDebug(COMPONENT_9P_DISPATCH, "My pthread id is %p",
966  			 (void *) pthread_self());
967  	
968  		/* Set up the _9p_socket (trying V6 first, will fall back to V4
969  		 * if V6 fails).
970  		 */
971  		_9p_socket = _9p_create_socket_V6();
972  	
973  		if (_9p_socket == -1) {
974  			LogFatal(COMPONENT_9P_DISPATCH,
975  				 "Can't get socket for 9p dispatcher");
976  		}
977  	
978  		/* Init for thread parameter (mostly for scheduling) */
979  		if (pthread_attr_init(&attr_thr) != 0)
980  			LogDebug(COMPONENT_9P_DISPATCH,
981  				 "can't init pthread's attributes");
982  	
983  		if (pthread_attr_setscope(&attr_thr, PTHREAD_SCOPE_SYSTEM) != 0)
984  			LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's scope");
985  	
986  		if (pthread_attr_setdetachstate(&attr_thr,
987  						PTHREAD_CREATE_DETACHED) != 0)
988  			LogDebug(COMPONENT_9P_DISPATCH,
989  				 "can't set pthread's join state");
990  	
991  		LogEvent(COMPONENT_9P_DISPATCH, "9P dispatcher started");
992  	
993  		while (true) {
994  			newsock = accept(_9p_socket, NULL, NULL);
995  	
996  			if (newsock < 0) {
997  				LogCrit(COMPONENT_9P_DISPATCH, "accept failed: %d",
998  					errno);
999  				continue;
1000 			}
1001 	
1002 			/* Starting the thread dedicated to signal handling */
1003 			rc = pthread_create(&tcp_thrid, &attr_thr,
1004 					    _9p_socket_thread, (void *)newsock);
1005 			if (rc != 0) {
1006 				LogFatal(COMPONENT_THREAD,
1007 					 "Could not create 9p socket manager thread, error = %d (%s)",
1008 					 errno, strerror(errno));
1009 			}
1010 		}			/* while */
1011 	
1012 		close(_9p_socket);
1013 		pthread_attr_destroy(&attr_thr);
1014 	
1015 		rcu_unregister_thread();
1016 		return NULL;
1017 	}				/* _9p_dispatcher_thread */
1018