1 /*
2 * vim:noexpandtab:shiftwidth=8:tabstop=8:
3 *
4 * Copyright CEA/DAM/DIF (2011)
5 * contributeur : Philippe DENIEL philippe.deniel@cea.fr
6 * Thomas LEIBOVICI thomas.leibovici@cea.fr
7 *
8 *
9 * This program is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License as published by the Free Software Foundation; either
12 * version 3 of the License, or (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this library; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 * 02110-1301 USA
23 *
24 * ---------------------------------------
25 */
26
27 /**
28 * \file 9p_dispatcher.c
29 * \date $Date: 2006/02/23 12:33:05 $
30 * \brief 9P protocol dispatch thread.
31 *
32 * The file that contains the '_9p_dispatcher_thread' routine for ganesha
33 * (and all the related stuff).
34 *
35 */
36 #include "config.h"
37 #include <stdio.h>
38 #include <string.h>
39 #include <pthread.h>
40 #include <fcntl.h>
41 #include <sys/file.h> /* for having FNDELAY */
42 #include <sys/select.h>
43 #include <sys/types.h>
44 #include <sys/socket.h>
45 #include <netinet/in.h>
46 #include <netinet/tcp.h>
47 #include <poll.h>
48 #include <arpa/inet.h> /* For inet_ntop() */
49 #include "hashtable.h"
50 #include "log.h"
51 #include "abstract_mem.h"
52 #include "abstract_atomic.h"
53 #include "nfs_init.h"
54 #include "nfs_core.h"
55 #include "nfs_exports.h"
56 #include "nfs_proto_functions.h"
57 #include "nfs_file_handle.h"
58 #include "9p_req_queue.h"
59 #include "client_mgr.h"
60 #include "server_stats.h"
61 #include "9p.h"
62 #include <stdbool.h>
63 #include <urcu-bp.h>
64
65 #define P_FAMILY AF_INET6
66
67 static struct fridgethr *_9p_worker_fridge;
68
69 static struct _9p_req_st _9p_req_st; /*< 9P request queues */
70
71 static const char *req_q_s[N_REQ_QUEUES] = {
72 "REQ_Q_LOW_LATENCY",
73 };
74
75 /* static */
76 uint32_t _9p_outstanding_reqs_est(void)
77 {
78 static uint32_t ctr;
79 static uint32_t nreqs;
80 struct req_q_pair *qpair;
81 uint32_t treqs;
82 int ix;
83
84 if ((atomic_inc_uint32_t(&ctr) % 10) != 0)
85 return atomic_fetch_uint32_t(&nreqs);
86
87 treqs = 0;
88 for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
89 qpair = &(_9p_req_st.reqs._9p_request_q.qset[ix]);
90 treqs += atomic_fetch_uint32_t(&qpair->producer.size);
91 treqs += atomic_fetch_uint32_t(&qpair->consumer.size);
92 }
93
94 atomic_store_uint32_t(&nreqs, treqs);
95 return treqs;
96 }
97
98 static inline struct _9p_request_data *_9p_consume_req(struct req_q_pair *qpair)
99 {
100 struct _9p_request_data *reqdata = NULL;
101
102 pthread_spin_lock(&qpair->consumer.sp);
103 if (qpair->consumer.size > 0) {
104 reqdata =
105 glist_first_entry(&qpair->consumer.q,
106 struct _9p_request_data,
107 req_q);
108 glist_del(&reqdata->req_q);
109 --(qpair->consumer.size);
110 pthread_spin_unlock(&qpair->consumer.sp);
111 goto out;
112 } else {
113 char *s = NULL;
114 uint32_t csize = ~0U;
115 uint32_t psize = ~0U;
116
117 pthread_spin_lock(&qpair->producer.sp);
118 if (isFullDebug(COMPONENT_DISPATCH)) {
119 s = (char *)qpair->s;
120 csize = qpair->consumer.size;
121 psize = qpair->producer.size;
122 }
123 if (qpair->producer.size > 0) {
124 /* splice */
125 glist_splice_tail(&qpair->consumer.q,
126 &qpair->producer.q);
127 qpair->consumer.size = qpair->producer.size;
128 qpair->producer.size = 0;
129 /* consumer.size > 0 */
130 pthread_spin_unlock(&qpair->producer.sp);
131 reqdata =
132 glist_first_entry(&qpair->consumer.q,
133 struct _9p_request_data,
134 req_q);
135 glist_del(&reqdata->req_q);
136 --(qpair->consumer.size);
137 pthread_spin_unlock(&qpair->consumer.sp);
138 if (s)
139 LogFullDebug(COMPONENT_DISPATCH,
140 "try splice, qpair %s consumer qsize=%u producer qsize=%u",
141 s, csize, psize);
142 goto out;
143 }
144
145 pthread_spin_unlock(&qpair->producer.sp);
146 pthread_spin_unlock(&qpair->consumer.sp);
147
148 if (s)
149 LogFullDebug(COMPONENT_DISPATCH,
150 "try splice, qpair %s consumer qsize=%u producer qsize=%u",
151 s, csize, psize);
152 }
153 out:
154 return reqdata;
155 }
156
157 static struct _9p_request_data *_9p_dequeue_req(struct _9p_worker_data *worker)
158 {
159 struct _9p_request_data *reqdata = NULL;
160 struct req_q_set *_9p_request_q = &_9p_req_st.reqs._9p_request_q;
161 struct req_q_pair *qpair;
162 uint32_t ix, slot;
163 struct timespec timeout;
164
165 /* XXX: the following stands in for a more robust/flexible
166 * weighting function */
167
168 retry_deq:
169 slot = atomic_inc_uint32_t(&_9p_req_st.reqs.ctr) % N_REQ_QUEUES;
170 for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
171 qpair = &(_9p_request_q->qset[slot]);
172
173 LogFullDebug(COMPONENT_DISPATCH,
174 "dequeue_req try qpair %s %p:%p", qpair->s,
175 &qpair->producer, &qpair->consumer);
176
177 /* anything? */
178 reqdata = _9p_consume_req(qpair);
179 if (reqdata) {
180 break;
181 }
182
183 ++slot;
184 slot = slot % N_REQ_QUEUES;
185
186 } /* for */
187
188 /* wait */
189 if (!reqdata) {
190 struct fridgethr_context *ctx =
191 container_of(worker, struct fridgethr_context, wd);
192 wait_q_entry_t *wqe = &worker->wqe;
193
194 assert(wqe->waiters == 0); /* wqe is not on any wait queue */
195 PTHREAD_MUTEX_lock(&wqe->lwe.mtx);
196 wqe->flags = Wqe_LFlag_WaitSync;
197 wqe->waiters = 1;
198 /* XXX functionalize */
199 pthread_spin_lock(&_9p_req_st.reqs.sp);
200 glist_add_tail(&_9p_req_st.reqs.wait_list, &wqe->waitq);
201 ++(_9p_req_st.reqs.waiters);
202 pthread_spin_unlock(&_9p_req_st.reqs.sp);
203 while (!(wqe->flags & Wqe_LFlag_SyncDone)) {
204 timeout.tv_sec = time(NULL) + 5;
205 timeout.tv_nsec = 0;
206 pthread_cond_timedwait(&wqe->lwe.cv, &wqe->lwe.mtx,
207 &timeout);
208 if (fridgethr_you_should_break(ctx)) {
209 /* We are returning;
210 * so take us out of the waitq */
211 pthread_spin_lock(&_9p_req_st.reqs.sp);
212 if (wqe->waitq.next != NULL
213 || wqe->waitq.prev != NULL) {
214 /* Element is still in wqitq,
215 * remove it */
216 glist_del(&wqe->waitq);
217 --(_9p_req_st.reqs.waiters);
218 --(wqe->waiters);
219 wqe->flags &=
220 ~(Wqe_LFlag_WaitSync |
221 Wqe_LFlag_SyncDone);
222 }
223 pthread_spin_unlock(&_9p_req_st.reqs.sp);
224 PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
225 return NULL;
226 }
227 }
228
229 /* XXX wqe was removed from _9p_req_st.waitq
230 * (by signalling thread) */
231 wqe->flags &= ~(Wqe_LFlag_WaitSync | Wqe_LFlag_SyncDone);
232 PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
233 LogFullDebug(COMPONENT_DISPATCH, "wqe wakeup %p", wqe);
234 goto retry_deq;
235 } /* !reqdata */
236
237 return reqdata;
238 }
239
240 static void _9p_enqueue_req(struct _9p_request_data *reqdata)
241 {
242 struct req_q_set *_9p_request_q;
243 struct req_q_pair *qpair;
244 struct req_q *q;
245
246 _9p_request_q = &_9p_req_st.reqs._9p_request_q;
247
248 qpair = &(_9p_request_q->qset[REQ_Q_LOW_LATENCY]);
249
250 /* always append to producer queue */
251 q = &qpair->producer;
252 pthread_spin_lock(&q->sp);
253 glist_add_tail(&q->q, &reqdata->req_q);
254 ++(q->size);
255 pthread_spin_unlock(&q->sp);
256
257 LogDebug(COMPONENT_DISPATCH,
258 "enqueued req, q %p (%s %p:%p) size is %d (enq %"
259 PRIu64 " deq %" PRIu64 ")",
260 q, qpair->s, &qpair->producer, &qpair->consumer, q->size,
261 nfs_health_.enqueued_reqs, nfs_health_.dequeued_reqs);
262
263 /* potentially wakeup some thread */
264
265 /* global waitq */
266 {
267 wait_q_entry_t *wqe;
268
269 /* SPIN LOCKED */
270 pthread_spin_lock(&_9p_req_st.reqs.sp);
271 if (_9p_req_st.reqs.waiters) {
272 wqe = glist_first_entry(&_9p_req_st.reqs.wait_list,
273 wait_q_entry_t, waitq);
274
275 LogFullDebug(COMPONENT_DISPATCH,
276 "_9p_req_st.reqs.waiters %u signal wqe %p (for q %p)",
277 _9p_req_st.reqs.waiters, wqe, q);
278
279 /* release 1 waiter */
280 glist_del(&wqe->waitq);
281 --(_9p_req_st.reqs.waiters);
282 --(wqe->waiters);
283 /* ! SPIN LOCKED */
284 pthread_spin_unlock(&_9p_req_st.reqs.sp);
285 PTHREAD_MUTEX_lock(&wqe->lwe.mtx);
286 /* XXX reliable handoff */
287 wqe->flags |= Wqe_LFlag_SyncDone;
288 if (wqe->flags & Wqe_LFlag_WaitSync)
289 pthread_cond_signal(&wqe->lwe.cv);
290 PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
291 } else
292 /* ! SPIN LOCKED */
293 pthread_spin_unlock(&_9p_req_st.reqs.sp);
294 }
295
296 return;
297 }
298
299 /**
300 * @brief Execute a 9p request
301 *
302 * @param[in,out] req9p 9p request
303 */
304 static void _9p_execute(struct _9p_request_data *req9p)
305 {
306 struct req_op_context req_ctx;
307 struct export_perms export_perms;
308
309 memset(&req_ctx, 0, sizeof(struct req_op_context));
310 memset(&export_perms, 0, sizeof(struct export_perms));
311 op_ctx = &req_ctx;
312 op_ctx->caller_addr = (sockaddr_t *)&req9p->pconn->addrpeer;
313 op_ctx->req_type = _9P_REQUEST;
314 op_ctx->export_perms = &export_perms;
315
316 if (req9p->pconn->trans_type == _9P_TCP)
317 _9p_tcp_process_request(req9p);
318 #ifdef _USE_9P_RDMA
319 else if (req9p->pconn->trans_type == _9P_RDMA)
320 _9p_rdma_process_request(req9p);
321 #endif
322 op_ctx = NULL;
323 } /* _9p_execute */
324
325 /**
326 * @brief Free resources allocated for a 9p request
327 *
328 * This does not free the request itself.
329 *
330 * @param[in] req9p 9p request
331 */
332 static void _9p_free_reqdata(struct _9p_request_data *req9p)
333 {
334 if (req9p->pconn->trans_type == _9P_TCP)
335 gsh_free(req9p->_9pmsg);
336
337 /* decrease connection refcount */
338 (void) atomic_dec_uint32_t(&req9p->pconn->refcount);
339 }
340
341 static uint32_t worker_indexer;
342
343 /**
344 * @brief Initialize a worker thread
345 *
346 * @param[in] ctx Thread fridge context
347 */
348
349 static void worker_thread_initializer(struct fridgethr_context *ctx)
350 {
351 struct _9p_worker_data *wd = &ctx->wd;
352 char thr_name[32];
353
354 wd->worker_index = atomic_inc_uint32_t(&worker_indexer);
355 snprintf(thr_name, sizeof(thr_name), "work-%u", wd->worker_index);
356 SetNameFunction(thr_name);
357
358 /* Initialize thr waitq */
359 init_wait_q_entry(&wd->wqe);
360 }
361
362 /**
363 * @brief Finalize a worker thread
364 *
365 * @param[in] ctx Thread fridge context
366 */
367
368 static void worker_thread_finalizer(struct fridgethr_context *ctx)
369 {
370 ctx->thread_info = NULL;
371 }
372
373 /**
374 * @brief The main function for a worker thread
375 *
376 * This is the body of the worker thread. Its starting arguments are
377 * located in global array worker_data. The argument is no pointer but
378 * the worker's index. It then uses this index to address its own
379 * worker data in the array.
380 *
381 * @param[in] ctx Fridge thread context
382 */
383
384 static void _9p_worker_run(struct fridgethr_context *ctx)
385 {
386 struct _9p_worker_data *worker_data = &ctx->wd;
387 struct _9p_request_data *reqdata;
388 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
389 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
390
391 /* Worker's loop */
392 while (!fridgethr_you_should_break(ctx)) {
393 reqdata = _9p_dequeue_req(worker_data);
394
395 if (!reqdata)
396 continue;
397
398 reqdata->mutex = &mutex;
399 reqdata->cond = &cond;
400
401 _9p_execute(reqdata);
402 _9p_free_reqdata(reqdata);
403
404 /* Free the req by releasing the entry */
405 LogFullDebug(COMPONENT_DISPATCH,
406 "Invalidating processed entry");
407
408 gsh_free(reqdata);
409 (void) atomic_inc_uint64_t(&nfs_health_.dequeued_reqs);
410 }
411
412 PTHREAD_MUTEX_destroy(&mutex);
413 PTHREAD_COND_destroy(&cond);
414 }
415
416 int _9p_worker_init(void)
417 {
418 struct fridgethr_params frp;
419 struct req_q_pair *qpair;
420 int ix;
421 int rc = 0;
422
423 /* Init request queue before workers */
424 pthread_spin_init(&_9p_req_st.reqs.sp, PTHREAD_PROCESS_PRIVATE);
425 _9p_req_st.reqs.size = 0;
426 for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
427 qpair = &(_9p_req_st.reqs._9p_request_q.qset[ix]);
428 qpair->s = req_q_s[ix];
429 _9p_rpc_q_init(&qpair->producer);
430 _9p_rpc_q_init(&qpair->consumer);
431 }
432
433 /* waitq */
434 glist_init(&_9p_req_st.reqs.wait_list);
435 _9p_req_st.reqs.waiters = 0;
436
437 memset(&frp, 0, sizeof(struct fridgethr_params));
438 frp.thr_max = _9p_param.nb_worker;
439 frp.thr_min = _9p_param.nb_worker;
440 frp.flavor = fridgethr_flavor_looper;
441 frp.thread_initialize = worker_thread_initializer;
442 frp.thread_finalize = worker_thread_finalizer;
443 frp.wake_threads = _9p_queue_awaken;
444 frp.wake_threads_arg = &_9p_req_st;
445
446 rc = fridgethr_init(&_9p_worker_fridge, "9P", &frp);
447 if (rc != 0) {
448 LogMajor(COMPONENT_DISPATCH,
449 "Unable to initialize worker fridge: %d", rc);
450 return rc;
451 }
452
453 rc = fridgethr_populate(_9p_worker_fridge, _9p_worker_run, NULL);
454
455 if (rc != 0) {
456 LogMajor(COMPONENT_DISPATCH,
457 "Unable to populate worker fridge: %d", rc);
458 }
459
460 return rc;
461 }
462
463 int _9p_worker_shutdown(void)
464 {
465 int rc;
466
467 if (!_9p_worker_fridge)
468 return 0;
469
470 rc = fridgethr_sync_command(_9p_worker_fridge, fridgethr_comm_stop,
471 120);
472
473 if (rc == ETIMEDOUT) {
474 LogMajor(COMPONENT_DISPATCH,
475 "Shutdown timed out, cancelling threads.");
476 fridgethr_cancel(_9p_worker_fridge);
477 } else if (rc != 0) {
478 LogMajor(COMPONENT_DISPATCH,
479 "Failed shutting down worker threads: %d", rc);
480 }
481 return rc;
482 }
483
484 void DispatchWork9P(struct _9p_request_data *req)
485 {
486 switch (req->pconn->trans_type) {
487 case _9P_TCP:
488 LogDebug(COMPONENT_DISPATCH,
489 "Dispatching 9P/TCP request %p, tcpsock=%lu",
490 req, req->pconn->trans_data.sockfd);
491 break;
492
493 case _9P_RDMA:
494 LogDebug(COMPONENT_DISPATCH,
495 "Dispatching 9P/RDMA request %p", req);
496 break;
497
498 default:
499 LogCrit(COMPONENT_DISPATCH,
500 "/!\\ Implementation error, bad 9P transport type");
501 return;
502 }
503
504 /* increase connection refcount */
505 (void) atomic_inc_uint32_t(&req->pconn->refcount);
506
507 /* new-style dispatch */
508 _9p_enqueue_req(req);
509 }
510
511 /**
512 * _9p_socket_thread: 9p socket manager.
513 *
514 * This function is the main loop for the 9p socket manager.
515 * One such thread exists per connection.
516 *
517 * @param Arg the socket number cast as a void * in pthread_create
518 *
519 * @return NULL
520 *
521 */
522
523 void *_9p_socket_thread(void *Arg)
524 {
525 long int tcp_sock = (long int)Arg;
526 int rc = -1;
527 struct pollfd fds[1];
528 int fdcount = 1;
529 static char my_name[MAXNAMLEN + 1];
530 char strcaller[INET6_ADDRSTRLEN];
531 struct _9p_request_data *req = NULL;
532 int tag;
533 unsigned long sequence = 0;
534 unsigned int i = 0;
535 char *_9pmsg = NULL;
536 uint32_t msglen;
537
538 struct _9p_conn _9p_conn;
539 socklen_t addrpeerlen;
540
541 int readlen = 0;
542 int total_readlen = 0;
543
544 snprintf(my_name, MAXNAMLEN, "9p_sock_mgr#fd=%ld", tcp_sock);
545 SetNameFunction(my_name);
546 rcu_register_thread();
547
548 /* Init the struct _9p_conn structure */
549 memset(&_9p_conn, 0, sizeof(_9p_conn));
550 PTHREAD_MUTEX_init(&_9p_conn.sock_lock, NULL);
551 _9p_conn.trans_type = _9P_TCP;
552 _9p_conn.trans_data.sockfd = tcp_sock;
553 for (i = 0; i < FLUSH_BUCKETS; i++) {
554 PTHREAD_MUTEX_init(&_9p_conn.flush_buckets[i].lock, NULL);
555 glist_init(&_9p_conn.flush_buckets[i].list);
556 }
557 atomic_store_uint32_t(&_9p_conn.refcount, 0);
558
559 /* Init the fids pointers array */
560 memset(&_9p_conn.fids, 0, _9P_FID_PER_CONN * sizeof(struct _9p_fid *));
561
562 /* Set initial msize.
563 * Client may request a lower value during TVERSION */
564 _9p_conn.msize = _9p_param._9p_tcp_msize;
565
566 if (gettimeofday(&_9p_conn.birth, NULL) == -1)
567 LogFatal(COMPONENT_9P, "Cannot get connection's time of birth");
568
569 addrpeerlen = sizeof(_9p_conn.addrpeer);
570 rc = getpeername(tcp_sock, (struct sockaddr *)&_9p_conn.addrpeer,
571 &addrpeerlen);
572 if (rc == -1) {
573 LogMajor(COMPONENT_9P,
574 "Cannot get peername to tcp socket for 9p, error %d (%s)",
575 errno, strerror(errno));
576 /* XXX */
577 strlcpy(strcaller, "(unresolved)", INET6_ADDRSTRLEN);
578 goto end;
579 } else {
580 switch (_9p_conn.addrpeer.ss_family) {
581 case AF_INET:
582 inet_ntop(_9p_conn.addrpeer.ss_family,
583 &((struct sockaddr_in *)&_9p_conn.addrpeer)->
584 sin_addr, strcaller, INET6_ADDRSTRLEN);
585 break;
586 case AF_INET6:
587 inet_ntop(_9p_conn.addrpeer.ss_family,
588 &((struct sockaddr_in6 *)&_9p_conn.addrpeer)->
589 sin6_addr, strcaller, INET6_ADDRSTRLEN);
590 break;
591 default:
592 snprintf(strcaller, INET6_ADDRSTRLEN, "BAD ADDRESS");
593 break;
594 }
595
596 LogEvent(COMPONENT_9P, "9p socket #%ld is connected to %s",
597 tcp_sock, strcaller);
598 }
599 _9p_conn.client = get_gsh_client(&_9p_conn.addrpeer, false);
600
601 /* Set up the structure used by poll */
602 memset((char *)fds, 0, sizeof(struct pollfd));
603 fds[0].fd = tcp_sock;
604 fds[0].events =
605 POLLIN | POLLPRI | POLLRDBAND | POLLRDNORM | POLLRDHUP | POLLHUP |
606 POLLERR | POLLNVAL;
607
608 for (;;) {
609 total_readlen = 0; /* new message */
610 rc = poll(fds, fdcount, -1);
611 if (rc == -1) {
612 /* timeout = -1 => Wait indefinitely for events */
613 /* Interruption if not an issue */
614 if (errno == EINTR)
615 continue;
616
617 LogCrit(COMPONENT_9P,
618 "Got error %u (%s) on fd %ld connect to %s while polling on socket",
619 errno, strerror(errno), tcp_sock, strcaller);
620 }
621
622 if (fds[0].revents & POLLNVAL) {
623 LogEvent(COMPONENT_9P,
624 "Client %s on socket %lu produced POLLNVAL",
625 strcaller, tcp_sock);
626 goto end;
627 }
628
629 if (fds[0].revents & (POLLERR | POLLHUP | POLLRDHUP)) {
630 LogEvent(COMPONENT_9P,
631 "Client %s on socket %lu has shut down and closed",
632 strcaller, tcp_sock);
633 goto end;
634 }
635
636 if (!(fds[0].revents & (POLLIN | POLLRDNORM)))
637 continue;
638
639 /* Prepare to read the message */
640 _9pmsg = gsh_malloc(_9p_conn.msize);
641
642 /* An incoming 9P request: the msg has a 4 bytes header
643 showing the size of the msg including the header */
644 readlen = recv(fds[0].fd, _9pmsg,
645 _9P_HDR_SIZE, MSG_WAITALL);
646 if (readlen != _9P_HDR_SIZE)
647 goto badmsg;
648
649 msglen = *(uint32_t *) _9pmsg;
650 if (msglen > _9p_conn.msize) {
651 LogCrit(COMPONENT_9P,
652 "Message size too big! got %u, max = %u",
653 msglen, _9p_conn.msize);
654 goto end;
655 }
656
657 LogFullDebug(COMPONENT_9P,
658 "Received 9P/TCP message of size %u from client %s on socket %lu",
659 msglen, strcaller, tcp_sock);
660
661 total_readlen += readlen;
662 while (total_readlen < msglen) {
663 readlen = recv(fds[0].fd,
664 _9pmsg + total_readlen,
665 msglen - total_readlen,
666 0);
667
668 if (readlen > 0) {
669 total_readlen += readlen;
670 continue;
671 }
672 if (readlen == 0 ||
673 (readlen < 0 && errno != EINTR))
674 goto badmsg;
675 } /* while */
676
677 server_stats_transport_done(_9p_conn.client,
678 total_readlen, 1, 0,
679 0, 0, 0);
680
681 /* Message is good. */
682 (void) atomic_inc_uint64_t(&nfs_health_.enqueued_reqs);
683 req = gsh_calloc(1, sizeof(struct _9p_request_data));
684
685 req->_9pmsg = _9pmsg;
686 req->pconn = &_9p_conn;
687
688 /* Add this request to the request list,
689 * should it be flushed later. */
690 tag = *(u16 *) (_9pmsg + _9P_HDR_SIZE + _9P_TYPE_SIZE);
691 _9p_AddFlushHook(req, tag, sequence++);
692 LogFullDebug(COMPONENT_9P,
693 "Request tag is %d\n", tag);
694
695 /* Message was OK push it */
696 DispatchWork9P(req);
697
698 /* Not our buffer anymore */
699 _9pmsg = NULL;
700 continue;
701
702 badmsg:
703 if (readlen == 0)
704 LogEvent(COMPONENT_9P,
705 "Premature end for Client %s on socket %lu, total read = %u",
706 strcaller, tcp_sock, total_readlen);
707 else if (readlen < 0) {
708 LogEvent(COMPONENT_9P,
709 "Read error client %s on socket %lu errno=%d, total read = %u",
710 strcaller, tcp_sock,
711 errno, total_readlen);
712 } else
713 LogEvent(COMPONENT_9P,
714 "Header too small! for client %s on socket %lu: readlen=%u expected=%u",
715 strcaller, tcp_sock, readlen,
716 _9P_HDR_SIZE);
717
718 /* Either way, we close the connection.
719 * It is not possible to survive
720 * once we get out of sync in the TCP stream
721 * with the client
722 */
723 break; /* bail out */
724 } /* for( ;; ) */
725
726 end:
727 LogEvent(COMPONENT_9P, "Closing connection on socket %lu", tcp_sock);
728 close(tcp_sock);
729
730 /* Free buffer if we encountered an error
731 * before we could give it to a worker */
732 if (_9pmsg)
733 gsh_free(_9pmsg);
734
735 while (atomic_fetch_uint32_t(&_9p_conn.refcount)) {
736 LogEvent(COMPONENT_9P, "Waiting for workers to release pconn");
737 sleep(1);
738 }
739
740 _9p_cleanup_fids(&_9p_conn);
741
742 if (_9p_conn.client != NULL)
743 put_gsh_client(_9p_conn.client);
744
745 rcu_unregister_thread();
746 pthread_exit(NULL);
747 } /* _9p_socket_thread */
748
749 /**
750 * _9p_create_socket_V4 : create the socket and bind for 9P using
751 * the available V4 interfaces on the host. This is not the default
752 * for ganesha. We expect _9p_create_socket_V6 to be called first
753 * and succeed. Only when the V6 function returns failure is this
754 * function expected to be called. See _9p_create_socket().
755 *
756 * @return socket fd or -1 in case of failure
757 *
758 */
759 static int _9p_create_socket_V4(void)
760 {
761 int sock = -1;
762 int one = 1;
763 int centvingt = 120;
764 int neuf = 9;
765 struct netbuf netbuf_tcp;
766 struct t_bind bindaddr_tcp;
767 struct __rpc_sockinfo si_tcp;
768 struct sockaddr_in sinaddr_tcp;
769
770 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
771 if (sock == -1) {
772 LogWarn(COMPONENT_9P_DISPATCH,
773 "Error creating 9p V4 socket, error %d(%s)",
774 errno, strerror(errno));
775 return -1;
776 }
777
778 if ((setsockopt(sock,
779 SOL_SOCKET, SO_REUSEADDR,
780 &one, sizeof(one)) == -1) ||
781 (setsockopt(sock,
782 IPPROTO_TCP, TCP_NODELAY,
783 &one, sizeof(one)) == -1) ||
784 (setsockopt(sock,
785 IPPROTO_TCP, TCP_KEEPIDLE,
786 ¢vingt, sizeof(centvingt)) == -1) ||
787 (setsockopt(sock,
788 IPPROTO_TCP, TCP_KEEPINTVL,
789 ¢vingt, sizeof(centvingt)) == -1) ||
790 (setsockopt(sock,
791 IPPROTO_TCP, TCP_KEEPCNT,
792 &neuf, sizeof(neuf)) == -1)) {
793 LogWarn(COMPONENT_9P_DISPATCH,
794 "Error setting 9p V4 socket option, error %d(%s)",
795 errno, strerror(errno));
796 goto err;
797 }
798
799 memset(&sinaddr_tcp, 0, sizeof(sinaddr_tcp));
800 sinaddr_tcp.sin_family = AF_INET;
801
802 /* All the interfaces on the machine are used */
803 sinaddr_tcp.sin_addr.s_addr = htonl(INADDR_ANY);
804 sinaddr_tcp.sin_port = htons(_9p_param._9p_tcp_port);
805
806 netbuf_tcp.maxlen = sizeof(sinaddr_tcp);
807 netbuf_tcp.len = sizeof(sinaddr_tcp);
808 netbuf_tcp.buf = &sinaddr_tcp;
809
810 bindaddr_tcp.qlen = SOMAXCONN;
811 bindaddr_tcp.addr = netbuf_tcp;
812
813 if (!__rpc_fd2sockinfo(sock, &si_tcp)) {
814 LogWarn(COMPONENT_9P_DISPATCH,
815 "Cannot get 9p socket info for tcp V4 socket error %d(%s)",
816 errno, strerror(errno));
817 goto err;
818 }
819
820 if (bind(sock,
821 (struct sockaddr *)bindaddr_tcp.addr.buf,
822 (socklen_t) si_tcp.si_alen) == -1) {
823 LogWarn(COMPONENT_9P_DISPATCH,
824 "Cannot bind 9p tcp V4 socket, error %d(%s)", errno,
825 strerror(errno));
826 goto err;
827 }
828
829 if (listen(sock, 20) == -1) {
830 LogWarn(COMPONENT_9P_DISPATCH,
831 "Cannot bind 9p tcp V4 socket, error %d(%s)", errno,
832 strerror(errno));
833 goto err;
834 }
835
836 return sock;
837
838 err:
839
840 close(sock);
841 return -1;
842 }
843
844 /**
845 * _9p_create_socket_V6 : create the socket and bind for 9P using
846 * the available V6 interfaces on the host
847 *
848 * @return socket fd or -1 in case of failure
849 *
850 */
851 static int _9p_create_socket_V6(void)
852 {
853 int sock = -1;
854 int one = 1;
855 int centvingt = 120;
856 int neuf = 9;
857 struct sockaddr_in6 sinaddr_tcp6;
858 struct netbuf netbuf_tcp6;
859 struct t_bind bindaddr_tcp6;
860 struct __rpc_sockinfo si_tcp6;
861
862 sock = socket(P_FAMILY, SOCK_STREAM, IPPROTO_TCP);
863 if (sock == -1) {
864 if (errno == EAFNOSUPPORT) {
865 LogWarn(COMPONENT_9P_DISPATCH,
866 "Error creating socket, V6 intfs disabled? error %d(%s)",
867 errno, strerror(errno));
868 return _9p_create_socket_V4();
869 }
870
871 return -1;
872 }
873
874 if ((setsockopt(sock,
875 SOL_SOCKET, SO_REUSEADDR,
876 &one, sizeof(one)) == -1) ||
877 (setsockopt(sock,
878 IPPROTO_TCP, TCP_NODELAY,
879 &one, sizeof(one)) == -1) ||
880 (setsockopt(sock,
881 IPPROTO_TCP, TCP_KEEPIDLE,
882 ¢vingt, sizeof(centvingt)) == -1) ||
883 (setsockopt(sock,
884 IPPROTO_TCP, TCP_KEEPINTVL,
885 ¢vingt, sizeof(centvingt)) == -1) ||
886 (setsockopt(sock,
887 IPPROTO_TCP, TCP_KEEPCNT,
888 &neuf, sizeof(neuf)) == -1)) {
889 LogWarn(COMPONENT_9P_DISPATCH,
890 "Error setting V6 socket option, error %d(%s)",
891 errno, strerror(errno));
892 goto err;
893 }
894
895 memset(&sinaddr_tcp6, 0, sizeof(sinaddr_tcp6));
896 sinaddr_tcp6.sin6_family = AF_INET6;
897 /* All the interfaces on the machine are used */
898 sinaddr_tcp6.sin6_addr = in6addr_any;
899 sinaddr_tcp6.sin6_port = htons(_9p_param._9p_tcp_port);
900
901 netbuf_tcp6.maxlen = sizeof(sinaddr_tcp6);
902 netbuf_tcp6.len = sizeof(sinaddr_tcp6);
903 netbuf_tcp6.buf = &sinaddr_tcp6;
904
905 bindaddr_tcp6.qlen = SOMAXCONN;
906 bindaddr_tcp6.addr = netbuf_tcp6;
907
908 if (!__rpc_fd2sockinfo(sock, &si_tcp6)) {
909 LogWarn(COMPONENT_9P_DISPATCH,
910 "Cannot get 9p socket info for tcp6 socket error %d(%s)",
911 errno, strerror(errno));
912 goto err;
913 }
914
915 if (bind(sock,
916 (struct sockaddr *)bindaddr_tcp6.addr.buf,
917 (socklen_t) si_tcp6.si_alen) == -1) {
918 LogWarn(COMPONENT_9P_DISPATCH,
919 "Cannot bind 9p tcp6 socket, error %d (%s)", errno,
920 strerror(errno));
921 goto err;
922 }
923
924 if (listen(sock, 20) == -1) {
925 LogWarn(COMPONENT_9P_DISPATCH,
926 "Cannot bind 9p tcp6 socket, error %d (%s)", errno,
927 strerror(errno));
928 goto err;
929 }
930
931 return sock;
932
933 err:
934
935 close(sock);
936 return -1;
937 }
938
939 /**
940 * _9p_dispatcher_thread: thread used for RPC dispatching.
941 *
942 * This function is the main loop for the 9p dispatcher.
943 * It never returns because it is an infinite loop.
944 *
945 * @param Arg (unused)
946 *
947 * @return Pointer to the result (but this function will mostly loop forever).
948 *
949 */
950 void *_9p_dispatcher_thread(void *Arg)
951 {
952 int _9p_socket;
953 int rc = 0;
954 long int newsock = -1;
955 pthread_attr_t attr_thr;
956 pthread_t tcp_thrid;
957
958 SetNameFunction("_9p_disp");
959
960 rcu_register_thread();
961
962 /* Calling dispatcher main loop */
963 LogInfo(COMPONENT_9P_DISPATCH, "Entering 9P dispatcher");
964
965 LogDebug(COMPONENT_9P_DISPATCH, "My pthread id is %p",
966 (void *) pthread_self());
967
968 /* Set up the _9p_socket (trying V6 first, will fall back to V4
969 * if V6 fails).
970 */
971 _9p_socket = _9p_create_socket_V6();
972
973 if (_9p_socket == -1) {
974 LogFatal(COMPONENT_9P_DISPATCH,
975 "Can't get socket for 9p dispatcher");
976 }
977
978 /* Init for thread parameter (mostly for scheduling) */
979 if (pthread_attr_init(&attr_thr) != 0)
980 LogDebug(COMPONENT_9P_DISPATCH,
981 "can't init pthread's attributes");
982
983 if (pthread_attr_setscope(&attr_thr, PTHREAD_SCOPE_SYSTEM) != 0)
984 LogDebug(COMPONENT_9P_DISPATCH, "can't set pthread's scope");
985
986 if (pthread_attr_setdetachstate(&attr_thr,
987 PTHREAD_CREATE_DETACHED) != 0)
988 LogDebug(COMPONENT_9P_DISPATCH,
989 "can't set pthread's join state");
990
991 LogEvent(COMPONENT_9P_DISPATCH, "9P dispatcher started");
992
993 while (true) {
994 newsock = accept(_9p_socket, NULL, NULL);
995
996 if (newsock < 0) {
997 LogCrit(COMPONENT_9P_DISPATCH, "accept failed: %d",
998 errno);
999 continue;
1000 }
1001
1002 /* Starting the thread dedicated to signal handling */
1003 rc = pthread_create(&tcp_thrid, &attr_thr,
1004 _9p_socket_thread, (void *)newsock);
1005 if (rc != 0) {
1006 LogFatal(COMPONENT_THREAD,
1007 "Could not create 9p socket manager thread, error = %d (%s)",
1008 errno, strerror(errno));
1009 }
1010 } /* while */
1011
1012 close(_9p_socket);
1013 pthread_attr_destroy(&attr_thr);
1014
1015 rcu_unregister_thread();
1016 return NULL;
1017 } /* _9p_dispatcher_thread */
1018