1 /*
2 * Copyright (c) 2012-2018 Red Hat, Inc. and/or its affiliates.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
15 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
16 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
18 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
20 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
21 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
23 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26 #include "config.h"
27
28 #include <sys/types.h>
29 #include <sys/poll.h>
30 #include <stdint.h>
31 #include <assert.h>
32 #include <err.h>
33 #include <errno.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36 #include <signal.h>
37
38 #include <rpc/types.h>
39 #include <misc/portable.h>
40 #include <rpc/rpc.h>
41 #include <rpc/svc_rqst.h>
42
43 #include "rpc_com.h"
44
45 #include <rpc/svc.h>
46 #include <misc/rbtree_x.h>
47 #include <misc/opr_queue.h>
48 #include <misc/timespec.h>
49 #include "clnt_internal.h"
50 #include "svc_internal.h"
51 #include "svc_xprt.h"
52 #include <rpc/svc_auth.h>
53
54 /**
55 * @file svc_rqst.c
56 * @contributeur William Allen Simpson <bill@cohortfs.com>
57 * @brief Multi-channel event signal package
58 *
59 * @section DESCRIPTION
60 *
61 * Maintains a list of all extant transports by event (channel) id.
62 *
63 * Each SVCXPRT points to its own handler, however, so operations to
64 * block/unblock events (for example) given an existing xprt handle
65 * are O(1) without any ordered or hashed representation.
66 */
67
68 #define SVC_RQST_TIMEOUT_MS (29 /* seconds (prime) was 120 */ * 1000)
69 #define SVC_RQST_WAKEUPS (1023)
70
71 /* > RPC_DPLX_LOCKED > SVC_XPRT_FLAG_LOCKED */
72 #define SVC_RQST_LOCKED 0x01000000
73 #define SVC_RQST_UNLOCK 0x02000000
74
75 static uint32_t round_robin;
76 /*static*/ uint32_t wakeups;
77
78 struct svc_rqst_rec {
79 struct work_pool_entry ev_wpe;
80 struct opr_rbtree call_expires;
81 mutex_t ev_lock;
82
83 int sv[2];
84 uint32_t id_k; /* chan id */
85
86 /*
87 * union of event processor types
88 */
89 enum svc_event_type ev_type;
90 union {
91 #if defined(TIRPC_EPOLL)
92 struct {
93 int epoll_fd;
94 struct epoll_event ctrl_ev;
95 struct epoll_event *events;
96 u_int max_events; /* max epoll events */
97 } epoll;
98 #endif
99 struct {
100 fd_set set; /* select/fd_set (currently unhooked) */
101 } fd;
102 } ev_u;
103
104 int32_t ev_refcnt;
105 uint16_t ev_flags;
106 };
107
108 struct svc_rqst_set {
109 mutex_t mtx;
110 struct svc_rqst_rec *srr;
111 uint32_t max_id;
112 uint32_t next_id;
113 };
114
115 static struct svc_rqst_set svc_rqst_set = {
116 MUTEX_INITIALIZER,
117 NULL,
118 0,
119 0,
120 };
121
122 /*
123 * Write 4-byte value to shared event-notification channel. The
124 * value as presently implemented can be interpreted only by one consumer,
125 * so is not relied on.
126 */
127 static inline void
128 ev_sig(int fd, uint32_t sig)
129 {
130 int code = write(fd, &sig, sizeof(uint32_t));
131
132 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST, "%s: fd %d sig %d", __func__, fd,
133 sig);
134 if (code < 1)
135 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
136 "%s: error writing to event socket [%d:%d]", __func__,
137 code, errno);
138 }
139
140 /*
141 * Read a single 4-byte value from the shared event-notification channel,
142 * the socket is in non-blocking mode. The value read is returned.
143 */
144 static inline uint32_t
145 consume_ev_sig_nb(int fd)
146 {
147 uint32_t sig = 0;
148 int code __attribute__ ((unused));
149
150 code = read(fd, &sig, sizeof(uint32_t));
151 return (sig);
152 }
153
154 static inline void
155 SetNonBlock(int fd)
156 {
157 int s_flags = fcntl(fd, F_GETFL, 0);
158 (void)fcntl(fd, F_SETFL, (s_flags | O_NONBLOCK));
159 }
160
161 void
162 svc_rqst_init(uint32_t channels)
163 {
164 mutex_lock(&svc_rqst_set.mtx);
165
166 if (svc_rqst_set.srr)
167 goto unlock;
168
169 svc_rqst_set.max_id = channels;
170 svc_rqst_set.next_id = channels;
171 svc_rqst_set.srr = mem_zalloc(channels * sizeof(struct svc_rqst_rec));
172
173 unlock:
174 mutex_unlock(&svc_rqst_set.mtx);
175 }
176
177 /**
178 * @brief Lookup a channel
179 */
180 static inline struct svc_rqst_rec *
181 svc_rqst_lookup_chan(uint32_t chan_id)
182 {
183 struct svc_rqst_rec *sr_rec;
184
185 if (chan_id >= svc_rqst_set.max_id)
186 return (NULL);
187
188 sr_rec = &svc_rqst_set.srr[chan_id];
189 if (atomic_fetch_int32_t(&sr_rec->ev_refcnt) <= 0)
190 return (NULL);
191
192 /* do not pre-increment to avoid accidental new channel */
193 atomic_inc_int32_t(&sr_rec->ev_refcnt);
194 return (sr_rec);
195 }
196
197 /* forward declaration in lieu of moving code {WAS} */
198 static void svc_rqst_run_task(struct work_pool_entry *);
199 static void svc_rqst_epoll_loop(struct work_pool_entry *wpe);
200 static void svc_complete_task(struct svc_rqst_rec *sr_rec, bool finished);
201
202 static int
203 svc_rqst_expire_cmpf(const struct opr_rbtree_node *lhs,
204 const struct opr_rbtree_node *rhs)
205 {
206 struct clnt_req *lk, *rk;
207
208 lk = opr_containerof(lhs, struct clnt_req, cc_rqst);
209 rk = opr_containerof(rhs, struct clnt_req, cc_rqst);
210
211 if (lk->cc_expire_ms < rk->cc_expire_ms)
212 return (-1);
213
214 if (lk->cc_expire_ms == rk->cc_expire_ms) {
215 return (0);
216 }
217
218 return (1);
219 }
220
221 static inline int
222 svc_rqst_expire_ms(struct timespec *to)
223 {
224 struct timespec ts;
225
226 /* coarse nsec, not system time */
227 (void)clock_gettime(CLOCK_MONOTONIC_FAST, &ts);
228 timespecadd(&ts, to, &ts);
229 return timespec_ms(&ts);
230 }
231
232 void
233 svc_rqst_expire_insert(struct clnt_req *cc)
234 {
235 struct cx_data *cx = CX_DATA(cc->cc_clnt);
236 struct svc_rqst_rec *sr_rec = (struct svc_rqst_rec *)cx->cx_rec->ev_p;
237 struct opr_rbtree_node *nv;
238
239 cc->cc_expire_ms = svc_rqst_expire_ms(&cc->cc_timeout);
240
241 mutex_lock(&sr_rec->ev_lock);
242 cc->cc_flags = CLNT_REQ_FLAG_EXPIRING;
243 repeat:
244 nv = opr_rbtree_insert(&sr_rec->call_expires, &cc->cc_rqst);
245 if (nv) {
246 /* add this slightly later */
247 cc->cc_expire_ms++;
248 goto repeat;
249 }
250 mutex_unlock(&sr_rec->ev_lock);
251
252 ev_sig(sr_rec->sv[0], 0); /* send wakeup */
253 }
254
255 void
256 svc_rqst_expire_remove(struct clnt_req *cc)
257 {
258 struct cx_data *cx = CX_DATA(cc->cc_clnt);
259 struct svc_rqst_rec *sr_rec = cx->cx_rec->ev_p;
260
261 mutex_lock(&sr_rec->ev_lock);
262 opr_rbtree_remove(&sr_rec->call_expires, &cc->cc_rqst);
263 mutex_unlock(&sr_rec->ev_lock);
264
265 ev_sig(sr_rec->sv[0], 0); /* send wakeup */
266 }
267
268 static void
269 svc_rqst_expire_task(struct work_pool_entry *wpe)
270 {
271 struct clnt_req *cc = opr_containerof(wpe, struct clnt_req, cc_wpe);
272
273 if (atomic_fetch_int32_t(&cc->cc_refcnt) > 1
274 && !(atomic_postset_uint16_t_bits(&cc->cc_flags,
275 CLNT_REQ_FLAG_BACKSYNC)
276 & (CLNT_REQ_FLAG_ACKSYNC | CLNT_REQ_FLAG_BACKSYNC))) {
277 /* (idempotent) cc_flags and cc_refcnt are set atomic.
278 * cc_refcnt need more than 1 (this task).
279 */
280 cc->cc_error.re_status = RPC_TIMEDOUT;
281 (*cc->cc_process_cb)(cc);
282 }
283
284 clnt_req_release(cc);
285 }
286
287 int
288 svc_rqst_new_evchan(uint32_t *chan_id /* OUT */, void *u_data, uint32_t flags)
289 {
290 struct svc_rqst_rec *sr_rec;
291 uint32_t n_id;
292 int code = 0;
293 work_pool_fun_t fun = svc_rqst_run_task;
294
295 mutex_lock(&svc_rqst_set.mtx);
296 if (!svc_rqst_set.next_id) {
297 /* too many new channels, re-use global default, may be zero */
298 *chan_id =
299 svc_rqst_set.next_id = __svc_params->ev_u.evchan.id;
300 mutex_unlock(&svc_rqst_set.mtx);
301 return (0);
302 }
303 n_id = --(svc_rqst_set.next_id);
304 sr_rec = &svc_rqst_set.srr[n_id];
305
306 if (atomic_postinc_int32_t(&sr_rec->ev_refcnt) > 0) {
307 /* already exists */
308 *chan_id = n_id;
309 mutex_unlock(&svc_rqst_set.mtx);
310 return (0);
311 }
312
313 flags |= SVC_RQST_FLAG_EPOLL; /* XXX */
314
315 /* create a pair of anonymous sockets for async event channel wakeups */
316 code = socketpair(AF_UNIX, SOCK_STREAM, 0, sr_rec->sv);
317 if (code) {
318 __warnx(TIRPC_DEBUG_FLAG_ERROR,
319 "%s: failed creating event signal socketpair (%d)",
320 __func__, code);
321 ++(svc_rqst_set.next_id);
322 mutex_unlock(&svc_rqst_set.mtx);
323 return (code);
324 }
325
326 /* set non-blocking */
327 SetNonBlock(sr_rec->sv[0]);
328 SetNonBlock(sr_rec->sv[1]);
329
330 #if defined(TIRPC_EPOLL)
331 if (flags & SVC_RQST_FLAG_EPOLL) {
332 sr_rec->ev_type = SVC_EVENT_EPOLL;
333 fun = svc_rqst_epoll_loop;
334
335 /* XXX improve this too */
336 sr_rec->ev_u.epoll.max_events =
337 __svc_params->ev_u.evchan.max_events;
338 sr_rec->ev_u.epoll.events = (struct epoll_event *)
339 mem_alloc(sr_rec->ev_u.epoll.max_events *
340 sizeof(struct epoll_event));
341
342 /* create epoll fd */
343 sr_rec->ev_u.epoll.epoll_fd =
344 epoll_create_wr(sr_rec->ev_u.epoll.max_events,
345 EPOLL_CLOEXEC);
346
347 if (sr_rec->ev_u.epoll.epoll_fd == -1) {
348 __warnx(TIRPC_DEBUG_FLAG_ERROR,
349 "%s: epoll_create failed (%d)", __func__,
350 errno);
351 mem_free(sr_rec->ev_u.epoll.events,
352 sr_rec->ev_u.epoll.max_events *
353 sizeof(struct epoll_event));
354 ++(svc_rqst_set.next_id);
355 mutex_unlock(&svc_rqst_set.mtx);
356 return (EINVAL);
357 }
358
359 /* permit wakeup of threads blocked in epoll_wait, with a
360 * couple of possible semantics */
361 sr_rec->ev_u.epoll.ctrl_ev.events =
362 EPOLLIN | EPOLLRDHUP;
363 sr_rec->ev_u.epoll.ctrl_ev.data.fd = sr_rec->sv[1];
364 code =
365 epoll_ctl(sr_rec->ev_u.epoll.epoll_fd, EPOLL_CTL_ADD,
366 sr_rec->sv[1], &sr_rec->ev_u.epoll.ctrl_ev);
367 if (code == -1) {
368 code = errno;
369 __warnx(TIRPC_DEBUG_FLAG_ERROR,
370 "%s: add control socket failed (%d)", __func__,
371 code);
372 }
373 } else {
374 /* legacy fdset (currently unhooked) */
375 sr_rec->ev_type = SVC_EVENT_FDSET;
376 }
377 #else
378 sr_rec->ev_type = SVC_EVENT_FDSET;
379 #endif
380
381 *chan_id =
382 sr_rec->id_k = n_id;
383 sr_rec->ev_flags = flags & SVC_RQST_FLAG_MASK;
384 opr_rbtree_init(&sr_rec->call_expires, svc_rqst_expire_cmpf);
385 mutex_init(&sr_rec->ev_lock, NULL);
386
387 if (!code) {
388 atomic_inc_int32_t(&sr_rec->ev_refcnt);
389 sr_rec->ev_wpe.fun = fun;
390 sr_rec->ev_wpe.arg = u_data;
391 work_pool_submit(&svc_work_pool, &sr_rec->ev_wpe);
392 }
393 mutex_unlock(&svc_rqst_set.mtx);
394
395 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
396 "%s: create evchan %d control fd pair (%d:%d)",
397 __func__, n_id,
398 sr_rec->sv[0], sr_rec->sv[1]);
399 return (code);
400 }
401
402 static inline void
403 svc_rqst_release(struct svc_rqst_rec *sr_rec)
404 {
405 if (atomic_dec_int32_t(&sr_rec->ev_refcnt) > 0)
406 return;
407
408 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
409 "%s: remove evchan %d control fd pair (%d:%d)",
410 __func__, sr_rec->id_k,
411 sr_rec->sv[0], sr_rec->sv[1]);
412
413 mutex_destroy(&sr_rec->ev_lock);
414 }
415
416 /*
417 * may be RPC_DPLX_LOCKED, and SVC_XPRT_FLAG_ADDED cleared
418 */
419 static inline int
420 svc_rqst_unhook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
421 {
422 int code = EINVAL;
423
424 switch (sr_rec->ev_type) {
425 #if defined(TIRPC_EPOLL)
426 case SVC_EVENT_EPOLL:
427 {
428 struct epoll_event *ev = &rec->ev_u.epoll.event;
429
430 /* clear epoll vector */
431 code = epoll_ctl(sr_rec->ev_u.epoll.epoll_fd,
432 EPOLL_CTL_DEL, rec->xprt.xp_fd, ev);
433 if (code) {
434 code = errno;
435 __warnx(TIRPC_DEBUG_FLAG_WARN,
436 "%s: %p fd %d xp_refcnt %" PRId32
437 " sr_rec %p evchan %d ev_refcnt %" PRId32
438 " epoll_fd %d control fd pair (%d:%d) unhook failed (%d)",
439 __func__, rec, rec->xprt.xp_fd,
440 rec->xprt.xp_refcnt,
441 sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
442 sr_rec->ev_u.epoll.epoll_fd,
443 sr_rec->sv[0], sr_rec->sv[1], code);
444 } else {
445 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
446 TIRPC_DEBUG_FLAG_REFCNT,
447 "%s: %p fd %d xp_refcnt %" PRId32
448 " sr_rec %p evchan %d ev_refcnt %" PRId32
449 " epoll_fd %d control fd pair (%d:%d) unhook",
450 __func__, rec, rec->xprt.xp_fd,
451 rec->xprt.xp_refcnt,
452 sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
453 sr_rec->ev_u.epoll.epoll_fd,
454 sr_rec->sv[0], sr_rec->sv[1]);
455 }
456 break;
457 }
458 #endif
459 default:
460 /* XXX formerly select/fd_set case, now placeholder for new
461 * event systems, reworked select, etc. */
462 break;
463 } /* switch */
464
465 return (code);
466 }
467
468 /*
469 * not locked
470 */
471 int
472 svc_rqst_rearm_events(SVCXPRT *xprt)
473 {
474 struct rpc_dplx_rec *rec = REC_XPRT(xprt);
475 struct svc_rqst_rec *sr_rec = (struct svc_rqst_rec *)rec->ev_p;
476 int code = EINVAL;
477
478 if (xprt->xp_flags & (SVC_XPRT_FLAG_ADDED | SVC_XPRT_FLAG_DESTROYED))
479 return (0);
480
481 /* MUST follow the destroyed check above */
482 if (sr_rec->ev_flags & SVC_RQST_FLAG_SHUTDOWN)
483 return (0);
484
485 SVC_REF(xprt, SVC_REF_FLAG_NONE);
486 rpc_dplx_rli(rec);
487
488 /* assuming success */
489 atomic_set_uint16_t_bits(&xprt->xp_flags, SVC_XPRT_FLAG_ADDED);
490
491 switch (sr_rec->ev_type) {
492 #if defined(TIRPC_EPOLL)
493 case SVC_EVENT_EPOLL:
494 {
495 struct epoll_event *ev = &rec->ev_u.epoll.event;
496
497 /* set up epoll user data */
498 ev->events = EPOLLIN | EPOLLONESHOT;
499
500 /* rearm in epoll vector */
501 code = epoll_ctl(sr_rec->ev_u.epoll.epoll_fd,
502 EPOLL_CTL_MOD, xprt->xp_fd, ev);
503 if (code) {
504 code = errno;
505 atomic_clear_uint16_t_bits(&xprt->xp_flags,
506 SVC_XPRT_FLAG_ADDED);
507 __warnx(TIRPC_DEBUG_FLAG_ERROR,
508 "%s: %p fd %d xp_refcnt %" PRId32
509 " sr_rec %p evchan %d ev_refcnt %" PRId32
510 " epoll_fd %d control fd pair (%d:%d) rearm failed (%d)",
511 __func__, rec, rec->xprt.xp_fd,
512 rec->xprt.xp_refcnt,
513 sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
514 sr_rec->ev_u.epoll.epoll_fd,
515 sr_rec->sv[0], sr_rec->sv[1], code);
(1) Event unlock: |
"svc_release_it" unlocks "xprt->xp_lock". [details] |
516 SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
517 } else {
518 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
519 TIRPC_DEBUG_FLAG_REFCNT,
520 "%s: %p fd %d xp_refcnt %" PRId32
521 " sr_rec %p evchan %d ev_refcnt %" PRId32
522 " epoll_fd %d control fd pair (%d:%d) rearm",
523 __func__, rec, rec->xprt.xp_fd,
524 rec->xprt.xp_refcnt,
525 sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
526 sr_rec->ev_u.epoll.epoll_fd,
527 sr_rec->sv[0], sr_rec->sv[1]);
528 }
529 break;
530 }
531 #endif
532 default:
533 /* XXX formerly select/fd_set case, now placeholder for new
534 * event systems, reworked select, etc. */
535 break;
536 } /* switch */
537
538 rpc_dplx_rui(rec);
539
540 return (code);
541 }
542
543 /*
544 * RPC_DPLX_LOCKED, and SVC_XPRT_FLAG_ADDED set
545 */
546 static inline int
547 svc_rqst_hook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
548 {
549 int code = EINVAL;
550
551 switch (sr_rec->ev_type) {
552 #if defined(TIRPC_EPOLL)
553 case SVC_EVENT_EPOLL:
554 {
555 struct epoll_event *ev = &rec->ev_u.epoll.event;
556
557 /* set up epoll user data */
558 ev->data.ptr = rec;
559
560 /* wait for read events, level triggered, oneshot */
561 ev->events = EPOLLIN | EPOLLONESHOT;
562
563 /* add to epoll vector */
564 code = epoll_ctl(sr_rec->ev_u.epoll.epoll_fd,
565 EPOLL_CTL_ADD, rec->xprt.xp_fd, ev);
566 if (code) {
567 code = errno;
568 atomic_clear_uint16_t_bits(&rec->xprt.xp_flags,
569 SVC_XPRT_FLAG_ADDED);
570 __warnx(TIRPC_DEBUG_FLAG_ERROR,
571 "%s: %p fd %d xp_refcnt %" PRId32
572 " sr_rec %p evchan %d ev_refcnt %" PRId32
573 " epoll_fd %d control fd pair (%d:%d) hook failed (%d)",
574 __func__, rec, rec->xprt.xp_fd,
575 rec->xprt.xp_refcnt,
576 sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
577 sr_rec->ev_u.epoll.epoll_fd,
578 sr_rec->sv[0], sr_rec->sv[1], code);
579 } else {
580 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
581 TIRPC_DEBUG_FLAG_REFCNT,
582 "%s: %p fd %d xp_refcnt %" PRId32
583 " sr_rec %p evchan %d ev_refcnt %" PRId32
584 " epoll_fd %d control fd pair (%d:%d) hook",
585 __func__, rec, rec->xprt.xp_fd,
586 rec->xprt.xp_refcnt,
587 sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
588 sr_rec->ev_u.epoll.epoll_fd,
589 sr_rec->sv[0], sr_rec->sv[1]);
590 }
591 break;
592 }
593 #endif
594 default:
595 /* XXX formerly select/fd_set case, now placeholder for new
596 * event systems, reworked select, etc. */
597 break;
598 } /* switch */
599
600 ev_sig(sr_rec->sv[0], 0); /* send wakeup */
601
602 return (code);
603 }
604
605 /*
606 * RPC_DPLX_LOCKED
607 */
608 static void
609 svc_rqst_unreg(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
610 {
611 uint16_t xp_flags = atomic_postclear_uint16_t_bits(&rec->xprt.xp_flags,
612 SVC_XPRT_FLAG_ADDED);
613
614 /* clear events */
615 if (xp_flags & SVC_XPRT_FLAG_ADDED)
616 (void)svc_rqst_unhook_events(rec, sr_rec);
617
618 /* Unlinking after debug message ensures both the xprt and the sr_rec
619 * are still present, as the xprt unregisters before release.
620 */
621 rec->ev_p = NULL;
622 svc_rqst_release(sr_rec);
623 }
624
625 /*
626 * flags indicate locking state
627 */
628 int
629 svc_rqst_evchan_reg(uint32_t chan_id, SVCXPRT *xprt, uint32_t flags)
630 {
631 struct rpc_dplx_rec *rec = REC_XPRT(xprt);
632 struct svc_rqst_rec *sr_rec;
633 struct svc_rqst_rec *ev_p;
634 int code;
635 uint16_t bits = SVC_XPRT_FLAG_ADDED | (flags & SVC_XPRT_FLAG_UREG);
636
637 if (chan_id == 0) {
638 /* Create a global/legacy event channel */
639 code = svc_rqst_new_evchan(&(__svc_params->ev_u.evchan.id),
640 NULL /* u_data */ ,
641 SVC_RQST_FLAG_CHAN_AFFINITY);
642 if (code) {
643 __warnx(TIRPC_DEBUG_FLAG_ERROR,
644 "%s: %p failed to create global/legacy channel (%d)",
645 __func__, xprt, code);
646 return (code);
647 }
648 chan_id = __svc_params->ev_u.evchan.id;
649 }
650
651 sr_rec = svc_rqst_lookup_chan(chan_id);
652 if (!sr_rec) {
653 __warnx(TIRPC_DEBUG_FLAG_ERROR,
654 "%s: %p unknown evchan %d",
655 __func__, xprt, chan_id);
656 return (ENOENT);
657 }
658
659 if (!(flags & RPC_DPLX_LOCKED))
660 rpc_dplx_rli(rec);
661
662 ev_p = (struct svc_rqst_rec *)rec->ev_p;
663 if (ev_p) {
664 if (ev_p == sr_rec) {
665 if (!(flags & RPC_DPLX_LOCKED))
666 rpc_dplx_rui(rec);
667 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
668 "%s: %p already registered evchan %d",
669 __func__, xprt, chan_id);
670 return (0);
671 }
672 svc_rqst_unreg(rec, ev_p);
673 }
674
675 /* assuming success */
676 atomic_set_uint16_t_bits(&xprt->xp_flags, bits);
677
678 /* link from xprt */
679 rec->ev_p = sr_rec;
680
681 /* register on event channel */
682 code = svc_rqst_hook_events(rec, sr_rec);
683
684 if (!(flags & RPC_DPLX_LOCKED))
685 rpc_dplx_rui(rec);
686
687 return (code);
688 }
689
690 /*
691 * not locked
692 */
693 int
694 svc_rqst_xprt_register(SVCXPRT *newxprt, SVCXPRT *xprt)
695 {
696 struct svc_rqst_rec *sr_rec;
697
698 /* if no parent xprt, use global/legacy event channel */
699 if (!xprt)
700 return svc_rqst_evchan_reg(__svc_params->ev_u.evchan.id,
701 newxprt,
702 SVC_RQST_FLAG_CHAN_AFFINITY);
703
704 sr_rec = (struct svc_rqst_rec *) REC_XPRT(xprt)->ev_p;
705
706 /* or if parent xprt has no dedicated event channel */
707 if (!sr_rec)
708 return svc_rqst_evchan_reg(__svc_params->ev_u.evchan.id,
709 newxprt,
710 SVC_RQST_FLAG_CHAN_AFFINITY);
711
712 /* if round robin policy, begin with global/legacy event channel */
713 if (!(sr_rec->ev_flags & SVC_RQST_FLAG_CHAN_AFFINITY)) {
714 int code = svc_rqst_evchan_reg(round_robin, newxprt,
715 SVC_RQST_FLAG_NONE);
716
717 if (!code) {
718 /* advance round robin channel */
719 code = svc_rqst_new_evchan(&round_robin, NULL,
720 SVC_RQST_FLAG_NONE);
721 }
722 return (code);
723 }
724
725 return svc_rqst_evchan_reg(sr_rec->id_k, newxprt, SVC_RQST_FLAG_NONE);
726 }
727
728 /*
729 * flags indicate locking state
730 *
731 * @note Locking
732 * Called via svc_release_it() with once-only semantic.
733 */
734 void
735 svc_rqst_xprt_unregister(SVCXPRT *xprt, uint32_t flags)
736 {
737 struct rpc_dplx_rec *rec = REC_XPRT(xprt);
738 struct svc_rqst_rec *sr_rec = (struct svc_rqst_rec *)rec->ev_p;
739
740 /* Remove from the transport list here (and only here)
741 * before clearing the registration to ensure other
742 * lookups cannot re-use this transport.
743 */
744 if (!(flags & RPC_DPLX_LOCKED))
745 rpc_dplx_rli(rec);
746
747 svc_xprt_clear(xprt);
748
749 if (!(flags & RPC_DPLX_LOCKED))
750 rpc_dplx_rui(rec);
751
752 if (!sr_rec) {
753 /* not currently registered */
754 return;
755 }
756 svc_rqst_unreg(rec, sr_rec);
757 }
758
759 /*static*/ void
760 svc_rqst_xprt_task(struct work_pool_entry *wpe)
761 {
762 struct rpc_dplx_rec *rec =
763 opr_containerof(wpe, struct rpc_dplx_rec, ioq.ioq_wpe);
764
765 atomic_clear_uint16_t_bits(&rec->ioq.ioq_s.qflags, IOQ_FLAG_WORKING);
766
767 /* atomic barrier (above) should protect following values */
768 if (rec->xprt.xp_refcnt > 1
769 && !(rec->xprt.xp_flags & SVC_XPRT_FLAG_DESTROYED)) {
770 /* (idempotent) xp_flags and xp_refcnt are set atomic.
771 * xp_refcnt need more than 1 (this task).
772 */
773 (void)clock_gettime(CLOCK_MONOTONIC_FAST, &(rec->recv.ts));
774 (void)SVC_RECV(&rec->xprt);
775 }
776
777 /* If tests fail, log non-fatal "WARNING! already destroying!" */
778 SVC_RELEASE(&rec->xprt, SVC_RELEASE_FLAG_NONE);
779 }
780
781 enum xprt_stat svc_request(SVCXPRT *xprt, XDR *xdrs)
782 {
783 enum xprt_stat stat;
784 struct svc_req *req = __svc_params->alloc_cb(xprt, xdrs);
785 struct rpc_dplx_rec *rpc_dplx_rec = REC_XPRT(xprt);
786
787 /* Track the request we are processing */
788 rpc_dplx_rec->svc_req = req;
789
790 /* All decode functions basically do a
791 * return xprt->xp_dispatch.process_cb(req);
792 */
793 stat = SVC_DECODE(req);
794
795 if (stat == XPRT_SUSPEND) {
796 /* The rquest is suspended, don't touch the request in any way
797 * because the resume may already be scheduled and running on
798 * another thread.
799 */
800 return XPRT_SUSPEND;
801 }
802
803 if (req->rq_auth)
804 SVCAUTH_RELEASE(req);
805
806 XDR_DESTROY(req->rq_xdrs);
807
808 __svc_params->free_cb(req, stat);
809
810 SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
811
812 return stat;
813 }
814
815 static void svc_resume_task(struct work_pool_entry *wpe)
816 {
817 struct rpc_dplx_rec *rec =
818 opr_containerof(wpe, struct rpc_dplx_rec, ioq.ioq_wpe);
819 struct svc_req *req = rec->svc_req;
820 SVCXPRT *xprt = &rec->xprt;
821 enum xprt_stat stat;
822
823 /* Resume the request. */
824 stat = req->rq_xprt->xp_resume_cb(req);
825
826 if (stat == XPRT_SUSPEND) {
827 /* The rquest is suspended, don't touch the request in any way
828 * because the resume may already be scheduled and running on
829 * another thread.
830 */
831 return;
832 }
833
834 if (req->rq_auth)
835 SVCAUTH_RELEASE(req);
836
837 XDR_DESTROY(req->rq_xdrs);
838
839 __svc_params->free_cb(req, stat);
840
841 SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
842 }
843
844 void svc_resume(struct svc_req *req)
845 {
846 struct rpc_dplx_rec *rpc_dplx_rec = REC_XPRT(req->rq_xprt);
847
848 rpc_dplx_rec->ioq.ioq_wpe.fun = svc_resume_task;
849 work_pool_submit(&svc_work_pool, &(rpc_dplx_rec->ioq.ioq_wpe));
850 }
851
852 /*
853 * Like __svc_clean_idle but event-type independent. For now no cleanfds.
854 */
855
856 struct svc_rqst_clean_arg {
857 struct timespec ts;
858 int timeout;
859 int cleaned;
860 };
861
862 static bool
863 svc_rqst_clean_func(SVCXPRT *xprt, void *arg)
864 {
865 struct svc_rqst_clean_arg *acc = (struct svc_rqst_clean_arg *)arg;
866
867 if (xprt->xp_ops == NULL)
868 return (false);
869
870 if (xprt->xp_flags & (SVC_XPRT_FLAG_DESTROYED | SVC_XPRT_FLAG_UREG))
871 return (false);
872
873 if ((acc->ts.tv_sec - REC_XPRT(xprt)->recv.ts.tv_sec) < acc->timeout)
874 return (false);
875
876 SVC_DESTROY(xprt);
877 acc->cleaned++;
878 return (true);
879 }
880
881 void authgss_ctx_gc_idle(void);
882
883 static void
884 svc_rqst_clean_idle(int timeout)
885 {
886 struct svc_rqst_clean_arg acc;
887 static mutex_t active_mtx = MUTEX_INITIALIZER;
888 static uint32_t active;
889
890 if (mutex_trylock(&active_mtx) != 0)
891 return;
892
893 if (active > 0)
894 goto unlock;
895
896 ++active;
897
898 #ifdef _HAVE_GSSAPI
899 /* trim gss context cache */
900 authgss_ctx_gc_idle();
901 #endif /* _HAVE_GSSAPI */
902
903 if (timeout <= 0)
904 goto unlock;
905
906 /* trim xprts (not sorted, not aggressive [but self limiting]) */
907 (void)clock_gettime(CLOCK_MONOTONIC_FAST, &acc.ts);
908 acc.timeout = timeout;
909 acc.cleaned = 0;
910
911 svc_xprt_foreach(svc_rqst_clean_func, (void *)&acc);
912
913 unlock:
914 --active;
915 mutex_unlock(&active_mtx);
916 return;
917 }
918
919 #ifdef TIRPC_EPOLL
920
921 static struct rpc_dplx_rec *
922 svc_rqst_epoll_event(struct svc_rqst_rec *sr_rec, struct epoll_event *ev)
923 {
924 struct rpc_dplx_rec *rec = (struct rpc_dplx_rec *) ev->data.ptr;
925 uint16_t xp_flags;
926
927 if (unlikely(ev->data.fd == sr_rec->sv[1])) {
928 /* signalled -- there was a wakeup on ctrl_ev (see
929 * top-of-loop) */
930 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
931 "%s: fd %d wakeup (sr_rec %p)",
932 __func__, sr_rec->sv[1],
933 sr_rec);
934 (void)consume_ev_sig_nb(sr_rec->sv[1]);
935 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
936 "%s: fd %d after consume sig (sr_rec %p)",
937 __func__, sr_rec->sv[1],
938 sr_rec);
939 return (NULL);
940 }
941
942 /* Another task may release transport in parallel.
943 * We have a ref from being in epoll, but since epoll is one-shot, a new ref
944 * will be taken when we re-enter epoll. Use this ref for the processor
945 * without taking another one.
946 */
947
948 /* MUST handle flags after reference.
949 * Although another task may unhook, the error is non-fatal.
950 */
951 xp_flags = atomic_postclear_uint16_t_bits(&rec->xprt.xp_flags,
952 SVC_XPRT_FLAG_ADDED);
953
954 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
955 TIRPC_DEBUG_FLAG_REFCNT,
956 "%s: %p fd %d xp_refcnt %" PRId32
957 " event %d",
958 __func__, rec, rec->xprt.xp_fd, rec->xprt.xp_refcnt,
959 ev->events);
960
961 if (rec->xprt.xp_refcnt > 1
962 && (xp_flags & SVC_XPRT_FLAG_ADDED)
963 && !(xp_flags & SVC_XPRT_FLAG_DESTROYED)
964 && !(atomic_postset_uint16_t_bits(&rec->ioq.ioq_s.qflags,
965 IOQ_FLAG_WORKING)
966 & IOQ_FLAG_WORKING)) {
967 /* (idempotent) xp_flags and xp_refcnt are set atomic.
968 * xp_refcnt need more than 1 (this event).
969 */
970 return (rec);
971 }
972
973 /* Do not return destroyed transports.
974 * Probably log non-fatal "WARNING! already destroying!"
975 */
976 SVC_RELEASE(&rec->xprt, SVC_RELEASE_FLAG_NONE);
977 return (NULL);
978 }
979
980 /*
981 * not locked
982 */
983 static inline struct rpc_dplx_rec *
984 svc_rqst_epoll_events(struct svc_rqst_rec *sr_rec, int n_events)
985 {
986 struct rpc_dplx_rec *rec = NULL;
987 int ix = 0;
988
989 while (ix < n_events) {
990 rec = svc_rqst_epoll_event(sr_rec,
991 &(sr_rec->ev_u.epoll.events[ix++]));
992 if (rec)
993 break;
994 }
995
996 if (!rec) {
997 /* continue waiting for events with this task */
998 return NULL;
999 }
1000
1001 while (ix < n_events) {
1002 struct rpc_dplx_rec *rec = svc_rqst_epoll_event(sr_rec,
1003 &(sr_rec->ev_u.epoll.events[ix++]));
1004 if (!rec)
1005 continue;
1006
1007 rec->ioq.ioq_wpe.fun = svc_rqst_xprt_task;
1008 work_pool_submit(&svc_work_pool, &(rec->ioq.ioq_wpe));
1009 }
1010
1011 /* submit another task to handle events in order */
1012 atomic_inc_int32_t(&sr_rec->ev_refcnt);
1013 work_pool_submit(&svc_work_pool, &sr_rec->ev_wpe);
1014
1015 return rec;
1016 }
1017
1018 static void svc_rqst_epoll_loop(struct work_pool_entry *wpe)
1019 {
1020 struct svc_rqst_rec *sr_rec =
1021 opr_containerof(wpe, struct svc_rqst_rec, ev_wpe);
1022 struct clnt_req *cc;
1023 struct opr_rbtree_node *n;
1024 struct timespec ts;
1025 int timeout_ms;
1026 int expire_ms;
1027 int n_events;
1028 bool finished;
1029
1030 for (;;) {
1031 timeout_ms = SVC_RQST_TIMEOUT_MS;
1032
1033 /* coarse nsec, not system time */
1034 (void)clock_gettime(CLOCK_MONOTONIC_FAST, &ts);
1035 expire_ms = timespec_ms(&ts);
1036
1037 /* before epoll_wait will accumulate events during scan */
1038 mutex_lock(&sr_rec->ev_lock);
1039 while ((n = opr_rbtree_first(&sr_rec->call_expires))) {
1040 cc = opr_containerof(n, struct clnt_req, cc_rqst);
1041
1042 if (cc->cc_expire_ms > expire_ms) {
1043 timeout_ms = cc->cc_expire_ms - expire_ms;
1044 break;
1045 }
1046
1047 /* order dependent */
1048 atomic_clear_uint16_t_bits(&cc->cc_flags,
1049 CLNT_REQ_FLAG_EXPIRING);
1050 opr_rbtree_remove(&sr_rec->call_expires, &cc->cc_rqst);
1051 cc->cc_expire_ms = 0; /* atomic barrier(s) */
1052
1053 atomic_inc_uint32_t(&cc->cc_refcnt);
1054 cc->cc_wpe.fun = svc_rqst_expire_task;
1055 cc->cc_wpe.arg = NULL;
1056 work_pool_submit(&svc_work_pool, &cc->cc_wpe);
1057 }
1058 mutex_unlock(&sr_rec->ev_lock);
1059
1060 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
1061 "%s: epoll_fd %d before epoll_wait (%d)",
1062 __func__,
1063 sr_rec->ev_u.epoll.epoll_fd,
1064 timeout_ms);
1065
1066 n_events = epoll_wait(sr_rec->ev_u.epoll.epoll_fd,
1067 sr_rec->ev_u.epoll.events,
1068 sr_rec->ev_u.epoll.max_events,
1069 timeout_ms);
1070
1071 if (unlikely(sr_rec->ev_flags & SVC_RQST_FLAG_SHUTDOWN)) {
1072 __warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
1073 "%s: epoll_fd %d epoll_wait shutdown (%d)",
1074 __func__,
1075 sr_rec->ev_u.epoll.epoll_fd,
1076 n_events);
1077 finished = true;
1078 break;
1079 }
1080 if (n_events > 0) {
1081 atomic_add_uint32_t(&wakeups, n_events);
1082 struct rpc_dplx_rec *rec;
1083
1084 rec = svc_rqst_epoll_events(sr_rec, n_events);
1085
1086 if (rec != NULL) {
1087 /* use this hot thread for the first event */
1088 rec->ioq.ioq_wpe.fun = svc_rqst_xprt_task;
1089 svc_rqst_xprt_task(&(rec->ioq.ioq_wpe));
1090
1091 /* failsafe idle processing after work task */
1092 if (atomic_postclear_uint32_t_bits(
1093 &wakeups, ~SVC_RQST_WAKEUPS)
1094 > SVC_RQST_WAKEUPS) {
1095 svc_rqst_clean_idle(
1096 __svc_params->idle_timeout);
1097 }
1098 finished = false;
1099 break;
1100 }
1101 continue;
1102 }
1103 if (!n_events) {
1104 /* timed out (idle) */
1105 atomic_inc_uint32_t(&wakeups);
1106 continue;
1107 }
1108 n_events = errno;
1109 if (n_events != EINTR) {
1110 __warnx(TIRPC_DEBUG_FLAG_WARN,
1111 "%s: epoll_fd %d epoll_wait failed (%d)",
1112 __func__,
1113 sr_rec->ev_u.epoll.epoll_fd,
1114 n_events);
1115 finished = true;
1116 break;
1117 }
1118 }
1119 if (finished) {
1120 close(sr_rec->ev_u.epoll.epoll_fd);
1121 mem_free(sr_rec->ev_u.epoll.events,
1122 sr_rec->ev_u.epoll.max_events *
1123 sizeof(struct epoll_event));
1124 }
1125
1126 svc_complete_task(sr_rec, finished);
1127 }
1128 #endif
1129
1130 static void svc_complete_task(struct svc_rqst_rec *sr_rec, bool finished)
1131 {
1132 if (finished) {
1133 /* reference count here should be 2:
1134 * 1 svc_rqst_set
1135 * +1 this work_pool thread
1136 * so, DROP one here so the final release will go to 0.
1137 */
1138 atomic_dec_int32_t(&sr_rec->ev_refcnt); /* svc_rqst_set */
1139 }
1140 svc_rqst_release(sr_rec);
1141 }
1142
1143 /*
1144 * No locking, "there can be only one"
1145 */
1146 static void
1147 svc_rqst_run_task(struct work_pool_entry *wpe)
1148 {
1149 struct svc_rqst_rec *sr_rec =
1150 opr_containerof(wpe, struct svc_rqst_rec, ev_wpe);
1151
1152 /* enter event loop */
1153 switch (sr_rec->ev_type) {
1154 default:
1155 /* XXX formerly select/fd_set case, now placeholder for new
1156 * event systems, reworked select, etc. */
1157 __warnx(TIRPC_DEBUG_FLAG_ERROR,
1158 "%s: unsupported event type",
1159 __func__);
1160 break;
1161 } /* switch */
1162
1163 svc_complete_task(sr_rec, true);
1164 }
1165
1166 int
1167 svc_rqst_thrd_signal(uint32_t chan_id, uint32_t flags)
1168 {
1169 struct svc_rqst_rec *sr_rec;
1170
1171 sr_rec = svc_rqst_lookup_chan(chan_id);
1172 if (!sr_rec) {
1173 __warnx(TIRPC_DEBUG_FLAG_ERROR,
1174 "%s: unknown evchan %d",
1175 __func__, chan_id);
1176 return (ENOENT);
1177 }
1178
1179 ev_sig(sr_rec->sv[0], flags); /* send wakeup */
1180
1181 __warnx(TIRPC_DEBUG_FLAG_ERROR,
1182 "%s: signalled evchan %d",
1183 __func__, chan_id);
1184 svc_rqst_release(sr_rec);
1185 return (0);
1186 }
1187
1188 static int
1189 svc_rqst_delete_evchan(uint32_t chan_id)
1190 {
1191 struct svc_rqst_rec *sr_rec;
1192 int code = 0;
1193
1194 sr_rec = svc_rqst_lookup_chan(chan_id);
1195 if (!sr_rec) {
1196 return (ENOENT);
1197 }
1198 atomic_set_uint16_t_bits(&sr_rec->ev_flags, SVC_RQST_FLAG_SHUTDOWN);
1199 ev_sig(sr_rec->sv[0], SVC_RQST_FLAG_SHUTDOWN);
1200
1201 svc_rqst_release(sr_rec);
1202 return (code);
1203 }
1204
1205 void
1206 svc_rqst_shutdown(void)
1207 {
1208 uint32_t channels = svc_rqst_set.max_id;
1209
1210 while (channels > 0) {
1211 svc_rqst_delete_evchan(--channels);
1212 }
1213 }
1214