1    	/*
2    	 * Copyright (c) 2012-2018 Red Hat, Inc. and/or its affiliates.
3    	 * All rights reserved.
4    	 *
5    	 * Redistribution and use in source and binary forms, with or without
6    	 * modification, are permitted provided that the following conditions
7    	 * are met:
8    	 * 1. Redistributions of source code must retain the above copyright
9    	 *    notice, this list of conditions and the following disclaimer.
10   	 * 2. Redistributions in binary form must reproduce the above copyright
11   	 *    notice, this list of conditions and the following disclaimer in the
12   	 *    documentation and/or other materials provided with the distribution.
13   	 *
14   	 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
15   	 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
16   	 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17   	 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
18   	 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19   	 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
20   	 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
21   	 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22   	 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
23   	 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24   	 */
25   	
26   	#include "config.h"
27   	
28   	#include <sys/types.h>
29   	#include <sys/poll.h>
30   	#include <stdint.h>
31   	#include <assert.h>
32   	#include <err.h>
33   	#include <errno.h>
34   	#include <unistd.h>
35   	#include <fcntl.h>
36   	#include <signal.h>
37   	
38   	#include <rpc/types.h>
39   	#include <misc/portable.h>
40   	#include <rpc/rpc.h>
41   	#include <rpc/svc_rqst.h>
42   	
43   	#include "rpc_com.h"
44   	
45   	#include <rpc/svc.h>
46   	#include <misc/rbtree_x.h>
47   	#include <misc/opr_queue.h>
48   	#include <misc/timespec.h>
49   	#include "clnt_internal.h"
50   	#include "svc_internal.h"
51   	#include "svc_xprt.h"
52   	#include <rpc/svc_auth.h>
53   	
54   	/**
55   	 * @file svc_rqst.c
56   	 * @contributeur William Allen Simpson <bill@cohortfs.com>
57   	 * @brief Multi-channel event signal package
58   	 *
59   	 * @section DESCRIPTION
60   	 *
61   	 * Maintains a list of all extant transports by event (channel) id.
62   	 *
63   	 * Each SVCXPRT points to its own handler, however, so operations to
64   	 * block/unblock events (for example) given an existing xprt handle
65   	 * are O(1) without any ordered or hashed representation.
66   	 */
67   	
68   	#define SVC_RQST_TIMEOUT_MS (29 /* seconds (prime) was 120 */ * 1000)
69   	#define SVC_RQST_WAKEUPS (1023)
70   	
71   	/* > RPC_DPLX_LOCKED > SVC_XPRT_FLAG_LOCKED */
72   	#define SVC_RQST_LOCKED		0x01000000
73   	#define SVC_RQST_UNLOCK		0x02000000
74   	
75   	static uint32_t round_robin;
76   	/*static*/ uint32_t wakeups;
77   	
78   	struct svc_rqst_rec {
79   		struct work_pool_entry ev_wpe;
80   		struct opr_rbtree call_expires;
81   		mutex_t ev_lock;
82   	
83   		int sv[2];
84   		uint32_t id_k;		/* chan id */
85   	
86   		/*
87   		 * union of event processor types
88   		 */
89   		enum svc_event_type ev_type;
90   		union {
91   	#if defined(TIRPC_EPOLL)
92   			struct {
93   				int epoll_fd;
94   				struct epoll_event ctrl_ev;
95   				struct epoll_event *events;
96   				u_int max_events;	/* max epoll events */
97   			} epoll;
98   	#endif
99   			struct {
100  				fd_set set;	/* select/fd_set (currently unhooked) */
101  			} fd;
102  		} ev_u;
103  	
104  		int32_t ev_refcnt;
105  		uint16_t ev_flags;
106  	};
107  	
108  	struct svc_rqst_set {
109  		mutex_t mtx;
110  		struct svc_rqst_rec *srr;
111  		uint32_t max_id;
112  		uint32_t next_id;
113  	};
114  	
115  	static struct svc_rqst_set svc_rqst_set = {
116  		MUTEX_INITIALIZER,
117  		NULL,
118  		0,
119  		0,
120  	};
121  	
122  	/*
123  	 * Write 4-byte value to shared event-notification channel.  The
124  	 * value as presently implemented can be interpreted only by one consumer,
125  	 * so is not relied on.
126  	 */
127  	static inline void
128  	ev_sig(int fd, uint32_t sig)
129  	{
130  		int code = write(fd, &sig, sizeof(uint32_t));
131  	
132  		__warnx(TIRPC_DEBUG_FLAG_SVC_RQST, "%s: fd %d sig %d", __func__, fd,
133  			sig);
134  		if (code < 1)
135  			__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
136  				"%s: error writing to event socket [%d:%d]", __func__,
137  				code, errno);
138  	}
139  	
140  	/*
141  	 * Read a single 4-byte value from the shared event-notification channel,
142  	 * the socket is in non-blocking mode.  The value read is returned.
143  	 */
144  	static inline uint32_t
145  	consume_ev_sig_nb(int fd)
146  	{
147  		uint32_t sig = 0;
148  		int code __attribute__ ((unused));
149  	
150  		code = read(fd, &sig, sizeof(uint32_t));
151  		return (sig);
152  	}
153  	
154  	static inline void
155  	SetNonBlock(int fd)
156  	{
157  		int s_flags = fcntl(fd, F_GETFL, 0);
158  		(void)fcntl(fd, F_SETFL, (s_flags | O_NONBLOCK));
159  	}
160  	
161  	void
162  	svc_rqst_init(uint32_t channels)
163  	{
164  		mutex_lock(&svc_rqst_set.mtx);
165  	
166  		if (svc_rqst_set.srr)
167  			goto unlock;
168  	
169  		svc_rqst_set.max_id = channels;
170  		svc_rqst_set.next_id = channels;
171  		svc_rqst_set.srr = mem_zalloc(channels * sizeof(struct svc_rqst_rec));
172  	
173  	 unlock:
174  		mutex_unlock(&svc_rqst_set.mtx);
175  	}
176  	
177  	/**
178  	 * @brief Lookup a channel
179  	 */
180  	static inline struct svc_rqst_rec *
181  	svc_rqst_lookup_chan(uint32_t chan_id)
182  	{
183  		struct svc_rqst_rec *sr_rec;
184  	
185  		if (chan_id >= svc_rqst_set.max_id)
186  			return (NULL);
187  	
188  		sr_rec = &svc_rqst_set.srr[chan_id];
189  		if (atomic_fetch_int32_t(&sr_rec->ev_refcnt) <= 0)
190  			return (NULL);
191  	
192  		/* do not pre-increment to avoid accidental new channel */
193  		atomic_inc_int32_t(&sr_rec->ev_refcnt);
194  		return (sr_rec);
195  	}
196  	
197  	/* forward declaration in lieu of moving code {WAS} */
198  	static void svc_rqst_run_task(struct work_pool_entry *);
199  	static void svc_rqst_epoll_loop(struct work_pool_entry *wpe);
200  	static void svc_complete_task(struct svc_rqst_rec *sr_rec, bool finished);
201  	
202  	static int
203  	svc_rqst_expire_cmpf(const struct opr_rbtree_node *lhs,
204  			     const struct opr_rbtree_node *rhs)
205  	{
206  		struct clnt_req *lk, *rk;
207  	
208  		lk = opr_containerof(lhs, struct clnt_req, cc_rqst);
209  		rk = opr_containerof(rhs, struct clnt_req, cc_rqst);
210  	
211  		if (lk->cc_expire_ms < rk->cc_expire_ms)
212  			return (-1);
213  	
214  		if (lk->cc_expire_ms == rk->cc_expire_ms) {
215  			return (0);
216  		}
217  	
218  		return (1);
219  	}
220  	
221  	static inline int
222  	svc_rqst_expire_ms(struct timespec *to)
223  	{
224  		struct timespec ts;
225  	
226  		/* coarse nsec, not system time */
227  		(void)clock_gettime(CLOCK_MONOTONIC_FAST, &ts);
228  		timespecadd(&ts, to, &ts);
229  		return timespec_ms(&ts);
230  	}
231  	
232  	void
233  	svc_rqst_expire_insert(struct clnt_req *cc)
234  	{
235  		struct cx_data *cx = CX_DATA(cc->cc_clnt);
236  		struct svc_rqst_rec *sr_rec = (struct svc_rqst_rec *)cx->cx_rec->ev_p;
237  		struct opr_rbtree_node *nv;
238  	
239  		cc->cc_expire_ms = svc_rqst_expire_ms(&cc->cc_timeout);
240  	
241  		mutex_lock(&sr_rec->ev_lock);
242  		cc->cc_flags = CLNT_REQ_FLAG_EXPIRING;
243  	 repeat:
244  		nv = opr_rbtree_insert(&sr_rec->call_expires, &cc->cc_rqst);
245  		if (nv) {
246  			/* add this slightly later */
247  			cc->cc_expire_ms++;
248  			goto repeat;
249  		}
250  		mutex_unlock(&sr_rec->ev_lock);
251  	
252  		ev_sig(sr_rec->sv[0], 0);	/* send wakeup */
253  	}
254  	
255  	void
256  	svc_rqst_expire_remove(struct clnt_req *cc)
257  	{
258  		struct cx_data *cx = CX_DATA(cc->cc_clnt);
259  		struct svc_rqst_rec *sr_rec = cx->cx_rec->ev_p;
260  	
261  		mutex_lock(&sr_rec->ev_lock);
262  		opr_rbtree_remove(&sr_rec->call_expires, &cc->cc_rqst);
263  		mutex_unlock(&sr_rec->ev_lock);
264  	
265  		ev_sig(sr_rec->sv[0], 0);	/* send wakeup */
266  	}
267  	
268  	static void
269  	svc_rqst_expire_task(struct work_pool_entry *wpe)
270  	{
271  		struct clnt_req *cc = opr_containerof(wpe, struct clnt_req, cc_wpe);
272  	
273  		if (atomic_fetch_int32_t(&cc->cc_refcnt) > 1
274  		 && !(atomic_postset_uint16_t_bits(&cc->cc_flags,
275  						   CLNT_REQ_FLAG_BACKSYNC)
276  		      & (CLNT_REQ_FLAG_ACKSYNC | CLNT_REQ_FLAG_BACKSYNC))) {
277  			/* (idempotent) cc_flags and cc_refcnt are set atomic.
278  			 * cc_refcnt need more than 1 (this task).
279  			 */
280  			cc->cc_error.re_status = RPC_TIMEDOUT;
281  			(*cc->cc_process_cb)(cc);
282  		}
283  	
284  		clnt_req_release(cc);
285  	}
286  	
287  	int
288  	svc_rqst_new_evchan(uint32_t *chan_id /* OUT */, void *u_data, uint32_t flags)
289  	{
290  		struct svc_rqst_rec *sr_rec;
291  		uint32_t n_id;
292  		int code = 0;
293  		work_pool_fun_t fun = svc_rqst_run_task;
294  	
295  		mutex_lock(&svc_rqst_set.mtx);
296  		if (!svc_rqst_set.next_id) {
297  			/* too many new channels, re-use global default, may be zero */
298  			*chan_id =
299  			svc_rqst_set.next_id = __svc_params->ev_u.evchan.id;
300  			mutex_unlock(&svc_rqst_set.mtx);
301  			return (0);
302  		}
303  		n_id = --(svc_rqst_set.next_id);
304  		sr_rec = &svc_rqst_set.srr[n_id];
305  	
306  		if (atomic_postinc_int32_t(&sr_rec->ev_refcnt) > 0) {
307  			/* already exists */
308  			*chan_id = n_id;
309  			mutex_unlock(&svc_rqst_set.mtx);
310  			return (0);
311  		}
312  	
313  		flags |= SVC_RQST_FLAG_EPOLL;	/* XXX */
314  	
315  		/* create a pair of anonymous sockets for async event channel wakeups */
316  		code = socketpair(AF_UNIX, SOCK_STREAM, 0, sr_rec->sv);
317  		if (code) {
318  			__warnx(TIRPC_DEBUG_FLAG_ERROR,
319  				"%s: failed creating event signal socketpair (%d)",
320  				__func__, code);
321  			++(svc_rqst_set.next_id);
322  			mutex_unlock(&svc_rqst_set.mtx);
323  			return (code);
324  		}
325  	
326  		/* set non-blocking */
327  		SetNonBlock(sr_rec->sv[0]);
328  		SetNonBlock(sr_rec->sv[1]);
329  	
330  	#if defined(TIRPC_EPOLL)
331  		if (flags & SVC_RQST_FLAG_EPOLL) {
332  			sr_rec->ev_type = SVC_EVENT_EPOLL;
333  			fun = svc_rqst_epoll_loop;
334  	
335  			/* XXX improve this too */
336  			sr_rec->ev_u.epoll.max_events =
337  			    __svc_params->ev_u.evchan.max_events;
338  			sr_rec->ev_u.epoll.events = (struct epoll_event *)
339  			    mem_alloc(sr_rec->ev_u.epoll.max_events *
340  				      sizeof(struct epoll_event));
341  	
342  			/* create epoll fd */
343  			sr_rec->ev_u.epoll.epoll_fd =
344  			    epoll_create_wr(sr_rec->ev_u.epoll.max_events,
345  					    EPOLL_CLOEXEC);
346  	
347  			if (sr_rec->ev_u.epoll.epoll_fd == -1) {
348  				__warnx(TIRPC_DEBUG_FLAG_ERROR,
349  					"%s: epoll_create failed (%d)", __func__,
350  					errno);
351  				mem_free(sr_rec->ev_u.epoll.events,
352  					 sr_rec->ev_u.epoll.max_events *
353  					 sizeof(struct epoll_event));
354  				++(svc_rqst_set.next_id);
355  				mutex_unlock(&svc_rqst_set.mtx);
356  				return (EINVAL);
357  			}
358  	
359  			/* permit wakeup of threads blocked in epoll_wait, with a
360  			 * couple of possible semantics */
361  			sr_rec->ev_u.epoll.ctrl_ev.events =
362  			    EPOLLIN | EPOLLRDHUP;
363  			sr_rec->ev_u.epoll.ctrl_ev.data.fd = sr_rec->sv[1];
364  			code =
365  			    epoll_ctl(sr_rec->ev_u.epoll.epoll_fd, EPOLL_CTL_ADD,
366  				      sr_rec->sv[1], &sr_rec->ev_u.epoll.ctrl_ev);
367  			if (code == -1) {
368  				code = errno;
369  				__warnx(TIRPC_DEBUG_FLAG_ERROR,
370  					"%s: add control socket failed (%d)", __func__,
371  					code);
372  			}
373  		} else {
374  			/* legacy fdset (currently unhooked) */
375  			sr_rec->ev_type = SVC_EVENT_FDSET;
376  		}
377  	#else
378  		sr_rec->ev_type = SVC_EVENT_FDSET;
379  	#endif
380  	
381  		*chan_id =
382  		sr_rec->id_k = n_id;
383  		sr_rec->ev_flags = flags & SVC_RQST_FLAG_MASK;
384  		opr_rbtree_init(&sr_rec->call_expires, svc_rqst_expire_cmpf);
385  		mutex_init(&sr_rec->ev_lock, NULL);
386  	
387  		if (!code) {
388  			atomic_inc_int32_t(&sr_rec->ev_refcnt);
389  			sr_rec->ev_wpe.fun = fun;
390  			sr_rec->ev_wpe.arg = u_data;
391  			work_pool_submit(&svc_work_pool, &sr_rec->ev_wpe);
392  		}
393  		mutex_unlock(&svc_rqst_set.mtx);
394  	
395  		__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
396  			"%s: create evchan %d control fd pair (%d:%d)",
397  			__func__, n_id,
398  			sr_rec->sv[0], sr_rec->sv[1]);
399  		return (code);
400  	}
401  	
402  	static inline void
403  	svc_rqst_release(struct svc_rqst_rec *sr_rec)
404  	{
405  		if (atomic_dec_int32_t(&sr_rec->ev_refcnt) > 0)
406  			return;
407  	
408  		__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
409  			"%s: remove evchan %d control fd pair (%d:%d)",
410  			__func__, sr_rec->id_k,
411  			sr_rec->sv[0], sr_rec->sv[1]);
412  	
413  		mutex_destroy(&sr_rec->ev_lock);
414  	}
415  	
416  	/*
417  	 * may be RPC_DPLX_LOCKED, and SVC_XPRT_FLAG_ADDED cleared
418  	 */
419  	static inline int
420  	svc_rqst_unhook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
421  	{
422  		int code = EINVAL;
423  	
424  		switch (sr_rec->ev_type) {
425  	#if defined(TIRPC_EPOLL)
426  		case SVC_EVENT_EPOLL:
427  		{
428  			struct epoll_event *ev = &rec->ev_u.epoll.event;
429  	
430  			/* clear epoll vector */
431  			code = epoll_ctl(sr_rec->ev_u.epoll.epoll_fd,
432  					 EPOLL_CTL_DEL, rec->xprt.xp_fd, ev);
433  			if (code) {
434  				code = errno;
435  				__warnx(TIRPC_DEBUG_FLAG_WARN,
436  					"%s: %p fd %d xp_refcnt %" PRId32
437  					" sr_rec %p evchan %d ev_refcnt %" PRId32
438  					" epoll_fd %d control fd pair (%d:%d) unhook failed (%d)",
439  					__func__, rec, rec->xprt.xp_fd,
440  					rec->xprt.xp_refcnt,
441  					sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
442  					sr_rec->ev_u.epoll.epoll_fd,
443  					sr_rec->sv[0], sr_rec->sv[1], code);
444  			} else {
445  				__warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
446  					TIRPC_DEBUG_FLAG_REFCNT,
447  					"%s: %p fd %d xp_refcnt %" PRId32
448  					" sr_rec %p evchan %d ev_refcnt %" PRId32
449  					" epoll_fd %d control fd pair (%d:%d) unhook",
450  					__func__, rec, rec->xprt.xp_fd,
451  					rec->xprt.xp_refcnt,
452  					sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
453  					sr_rec->ev_u.epoll.epoll_fd,
454  					sr_rec->sv[0], sr_rec->sv[1]);
455  			}
456  			break;
457  		}
458  	#endif
459  		default:
460  			/* XXX formerly select/fd_set case, now placeholder for new
461  			 * event systems, reworked select, etc. */
462  			break;
463  		}			/* switch */
464  	
465  		return (code);
466  	}
467  	
468  	/*
469  	 * not locked
470  	 */
471  	int
472  	svc_rqst_rearm_events(SVCXPRT *xprt)
473  	{
474  		struct rpc_dplx_rec *rec = REC_XPRT(xprt);
475  		struct svc_rqst_rec *sr_rec = (struct svc_rqst_rec *)rec->ev_p;
476  		int code = EINVAL;
477  	
478  		if (xprt->xp_flags & (SVC_XPRT_FLAG_ADDED | SVC_XPRT_FLAG_DESTROYED))
479  			return (0);
480  	
481  		/* MUST follow the destroyed check above */
482  		if (sr_rec->ev_flags & SVC_RQST_FLAG_SHUTDOWN)
483  			return (0);
484  	
485  		SVC_REF(xprt, SVC_REF_FLAG_NONE);
486  		rpc_dplx_rli(rec);
487  	
488  		/* assuming success */
489  		atomic_set_uint16_t_bits(&xprt->xp_flags, SVC_XPRT_FLAG_ADDED);
490  	
491  		switch (sr_rec->ev_type) {
492  	#if defined(TIRPC_EPOLL)
493  		case SVC_EVENT_EPOLL:
494  		{
495  			struct epoll_event *ev = &rec->ev_u.epoll.event;
496  	
497  			/* set up epoll user data */
498  			ev->events = EPOLLIN | EPOLLONESHOT;
499  	
500  			/* rearm in epoll vector */
501  			code = epoll_ctl(sr_rec->ev_u.epoll.epoll_fd,
502  					 EPOLL_CTL_MOD, xprt->xp_fd, ev);
503  			if (code) {
504  				code = errno;
505  				atomic_clear_uint16_t_bits(&xprt->xp_flags,
506  							   SVC_XPRT_FLAG_ADDED);
507  				__warnx(TIRPC_DEBUG_FLAG_ERROR,
508  					"%s: %p fd %d xp_refcnt %" PRId32
509  					" sr_rec %p evchan %d ev_refcnt %" PRId32
510  					" epoll_fd %d control fd pair (%d:%d) rearm failed (%d)",
511  					__func__, rec, rec->xprt.xp_fd,
512  					rec->xprt.xp_refcnt,
513  					sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
514  					sr_rec->ev_u.epoll.epoll_fd,
515  					sr_rec->sv[0], sr_rec->sv[1], code);
(1) Event unlock: "svc_release_it" unlocks "xprt->xp_lock". [details]
516  				SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
517  			} else {
518  				__warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
519  					TIRPC_DEBUG_FLAG_REFCNT,
520  					"%s: %p fd %d xp_refcnt %" PRId32
521  					" sr_rec %p evchan %d ev_refcnt %" PRId32
522  					" epoll_fd %d control fd pair (%d:%d) rearm",
523  					__func__, rec, rec->xprt.xp_fd,
524  					rec->xprt.xp_refcnt,
525  					sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
526  					sr_rec->ev_u.epoll.epoll_fd,
527  					sr_rec->sv[0], sr_rec->sv[1]);
528  			}
529  			break;
530  		}
531  	#endif
532  		default:
533  			/* XXX formerly select/fd_set case, now placeholder for new
534  			 * event systems, reworked select, etc. */
535  			break;
536  		}			/* switch */
537  	
538  		rpc_dplx_rui(rec);
539  	
540  		return (code);
541  	}
542  	
543  	/*
544  	 * RPC_DPLX_LOCKED, and SVC_XPRT_FLAG_ADDED set
545  	 */
546  	static inline int
547  	svc_rqst_hook_events(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
548  	{
549  		int code = EINVAL;
550  	
551  		switch (sr_rec->ev_type) {
552  	#if defined(TIRPC_EPOLL)
553  		case SVC_EVENT_EPOLL:
554  		{
555  			struct epoll_event *ev = &rec->ev_u.epoll.event;
556  	
557  			/* set up epoll user data */
558  			ev->data.ptr = rec;
559  	
560  			/* wait for read events, level triggered, oneshot */
561  			ev->events = EPOLLIN | EPOLLONESHOT;
562  	
563  			/* add to epoll vector */
564  			code = epoll_ctl(sr_rec->ev_u.epoll.epoll_fd,
565  					 EPOLL_CTL_ADD, rec->xprt.xp_fd, ev);
566  			if (code) {
567  				code = errno;
568  				atomic_clear_uint16_t_bits(&rec->xprt.xp_flags,
569  							   SVC_XPRT_FLAG_ADDED);
570  				__warnx(TIRPC_DEBUG_FLAG_ERROR,
571  					"%s: %p fd %d xp_refcnt %" PRId32
572  					" sr_rec %p evchan %d ev_refcnt %" PRId32
573  					" epoll_fd %d control fd pair (%d:%d) hook failed (%d)",
574  					__func__, rec, rec->xprt.xp_fd,
575  					rec->xprt.xp_refcnt,
576  					sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
577  					sr_rec->ev_u.epoll.epoll_fd,
578  					sr_rec->sv[0], sr_rec->sv[1], code);
579  			} else {
580  				__warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
581  					TIRPC_DEBUG_FLAG_REFCNT,
582  					"%s: %p fd %d xp_refcnt %" PRId32
583  					" sr_rec %p evchan %d ev_refcnt %" PRId32
584  					" epoll_fd %d control fd pair (%d:%d) hook",
585  					__func__, rec, rec->xprt.xp_fd,
586  					rec->xprt.xp_refcnt,
587  					sr_rec, sr_rec->id_k, sr_rec->ev_refcnt,
588  					sr_rec->ev_u.epoll.epoll_fd,
589  					sr_rec->sv[0], sr_rec->sv[1]);
590  			}
591  			break;
592  		}
593  	#endif
594  		default:
595  			/* XXX formerly select/fd_set case, now placeholder for new
596  			 * event systems, reworked select, etc. */
597  			break;
598  		}			/* switch */
599  	
600  		ev_sig(sr_rec->sv[0], 0);	/* send wakeup */
601  	
602  		return (code);
603  	}
604  	
605  	/*
606  	 * RPC_DPLX_LOCKED
607  	 */
608  	static void
609  	svc_rqst_unreg(struct rpc_dplx_rec *rec, struct svc_rqst_rec *sr_rec)
610  	{
611  		uint16_t xp_flags = atomic_postclear_uint16_t_bits(&rec->xprt.xp_flags,
612  								   SVC_XPRT_FLAG_ADDED);
613  	
614  		/* clear events */
615  		if (xp_flags & SVC_XPRT_FLAG_ADDED)
616  			(void)svc_rqst_unhook_events(rec, sr_rec);
617  	
618  		/* Unlinking after debug message ensures both the xprt and the sr_rec
619  		 * are still present, as the xprt unregisters before release.
620  		 */
621  		rec->ev_p = NULL;
622  		svc_rqst_release(sr_rec);
623  	}
624  	
625  	/*
626  	 * flags indicate locking state
627  	 */
628  	int
629  	svc_rqst_evchan_reg(uint32_t chan_id, SVCXPRT *xprt, uint32_t flags)
630  	{
631  		struct rpc_dplx_rec *rec = REC_XPRT(xprt);
632  		struct svc_rqst_rec *sr_rec;
633  		struct svc_rqst_rec *ev_p;
634  		int code;
635  		uint16_t bits = SVC_XPRT_FLAG_ADDED | (flags & SVC_XPRT_FLAG_UREG);
636  	
637  		if (chan_id == 0) {
638  			/* Create a global/legacy event channel */
639  			code = svc_rqst_new_evchan(&(__svc_params->ev_u.evchan.id),
640  						   NULL /* u_data */ ,
641  						   SVC_RQST_FLAG_CHAN_AFFINITY);
642  			if (code) {
643  				__warnx(TIRPC_DEBUG_FLAG_ERROR,
644  					"%s: %p failed to create global/legacy channel (%d)",
645  					__func__, xprt, code);
646  				return (code);
647  			}
648  			chan_id = __svc_params->ev_u.evchan.id;
649  		}
650  	
651  		sr_rec = svc_rqst_lookup_chan(chan_id);
652  		if (!sr_rec) {
653  			__warnx(TIRPC_DEBUG_FLAG_ERROR,
654  				"%s: %p unknown evchan %d",
655  				__func__, xprt, chan_id);
656  			return (ENOENT);
657  		}
658  	
659  		if (!(flags & RPC_DPLX_LOCKED))
660  			rpc_dplx_rli(rec);
661  	
662  		ev_p = (struct svc_rqst_rec *)rec->ev_p;
663  		if (ev_p) {
664  			if (ev_p == sr_rec) {
665  				if (!(flags & RPC_DPLX_LOCKED))
666  					rpc_dplx_rui(rec);
667  				__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
668  					"%s: %p already registered evchan %d",
669  					__func__, xprt, chan_id);
670  				return (0);
671  			}
672  			svc_rqst_unreg(rec, ev_p);
673  		}
674  	
675  		/* assuming success */
676  		atomic_set_uint16_t_bits(&xprt->xp_flags, bits);
677  	
678  		/* link from xprt */
679  		rec->ev_p = sr_rec;
680  	
681  		/* register on event channel */
682  		code = svc_rqst_hook_events(rec, sr_rec);
683  	
684  		if (!(flags & RPC_DPLX_LOCKED))
685  			rpc_dplx_rui(rec);
686  	
687  		return (code);
688  	}
689  	
690  	/*
691  	 * not locked
692  	 */
693  	int
694  	svc_rqst_xprt_register(SVCXPRT *newxprt, SVCXPRT *xprt)
695  	{
696  		struct svc_rqst_rec *sr_rec;
697  	
698  		/* if no parent xprt, use global/legacy event channel */
699  		if (!xprt)
700  			return svc_rqst_evchan_reg(__svc_params->ev_u.evchan.id,
701  						   newxprt,
702  						   SVC_RQST_FLAG_CHAN_AFFINITY);
703  	
704  		sr_rec = (struct svc_rqst_rec *) REC_XPRT(xprt)->ev_p;
705  	
706  		/* or if parent xprt has no dedicated event channel */
707  		if (!sr_rec)
708  			return svc_rqst_evchan_reg(__svc_params->ev_u.evchan.id,
709  						   newxprt,
710  						   SVC_RQST_FLAG_CHAN_AFFINITY);
711  	
712  		/* if round robin policy, begin with global/legacy event channel */
713  		if (!(sr_rec->ev_flags & SVC_RQST_FLAG_CHAN_AFFINITY)) {
714  			int code = svc_rqst_evchan_reg(round_robin, newxprt,
715  						       SVC_RQST_FLAG_NONE);
716  	
717  			if (!code) {
718  				/* advance round robin channel */
719  				code = svc_rqst_new_evchan(&round_robin, NULL,
720  							   SVC_RQST_FLAG_NONE);
721  			}
722  			return (code);
723  		}
724  	
725  		return svc_rqst_evchan_reg(sr_rec->id_k, newxprt, SVC_RQST_FLAG_NONE);
726  	}
727  	
728  	/*
729  	 * flags indicate locking state
730  	 *
731  	 * @note Locking
732  	 *	Called via svc_release_it() with once-only semantic.
733  	 */
734  	void
735  	svc_rqst_xprt_unregister(SVCXPRT *xprt, uint32_t flags)
736  	{
737  		struct rpc_dplx_rec *rec = REC_XPRT(xprt);
738  		struct svc_rqst_rec *sr_rec = (struct svc_rqst_rec *)rec->ev_p;
739  	
740  		/* Remove from the transport list here (and only here)
741  		 * before clearing the registration to ensure other
742  		 * lookups cannot re-use this transport.
743  		 */
744  		if (!(flags & RPC_DPLX_LOCKED))
745  			rpc_dplx_rli(rec);
746  	
747  		svc_xprt_clear(xprt);
748  	
749  		if (!(flags & RPC_DPLX_LOCKED))
750  			rpc_dplx_rui(rec);
751  	
752  		if (!sr_rec) {
753  			/* not currently registered */
754  			return;
755  		}
756  		svc_rqst_unreg(rec, sr_rec);
757  	}
758  	
759  	/*static*/ void
760  	svc_rqst_xprt_task(struct work_pool_entry *wpe)
761  	{
762  		struct rpc_dplx_rec *rec =
763  				opr_containerof(wpe, struct rpc_dplx_rec, ioq.ioq_wpe);
764  	
765  		atomic_clear_uint16_t_bits(&rec->ioq.ioq_s.qflags, IOQ_FLAG_WORKING);
766  	
767  		/* atomic barrier (above) should protect following values */
768  		if (rec->xprt.xp_refcnt > 1
769  		 && !(rec->xprt.xp_flags & SVC_XPRT_FLAG_DESTROYED)) {
770  			/* (idempotent) xp_flags and xp_refcnt are set atomic.
771  			 * xp_refcnt need more than 1 (this task).
772  			 */
773  			(void)clock_gettime(CLOCK_MONOTONIC_FAST, &(rec->recv.ts));
774  			(void)SVC_RECV(&rec->xprt);
775  		}
776  	
777  		/* If tests fail, log non-fatal "WARNING! already destroying!" */
778  		SVC_RELEASE(&rec->xprt, SVC_RELEASE_FLAG_NONE);
779  	}
780  	
781  	enum xprt_stat svc_request(SVCXPRT *xprt, XDR *xdrs)
782  	{
783  		enum xprt_stat stat;
784  		struct svc_req *req = __svc_params->alloc_cb(xprt, xdrs);
785  		struct rpc_dplx_rec *rpc_dplx_rec = REC_XPRT(xprt);
786  	
787  		/* Track the request we are processing */
788  		rpc_dplx_rec->svc_req = req;
789  	
790  		/* All decode functions basically do a
791  		 * return xprt->xp_dispatch.process_cb(req);
792  		 */
793  		stat = SVC_DECODE(req);
794  	
795  		if (stat == XPRT_SUSPEND) {
796  			/* The rquest is suspended, don't touch the request in any way
797  			 * because the resume may already be scheduled and running on
798  			 * another thread.
799  			 */
800  			return XPRT_SUSPEND;
801  		}
802  	
803  		if (req->rq_auth)
804  			SVCAUTH_RELEASE(req);
805  	
806  		XDR_DESTROY(req->rq_xdrs);
807  	
808  		__svc_params->free_cb(req, stat);
809  	
810  		SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
811  	
812  		return stat;
813  	}
814  	
815  	static void svc_resume_task(struct work_pool_entry *wpe)
816  	{
817  		struct rpc_dplx_rec *rec =
818  				opr_containerof(wpe, struct rpc_dplx_rec, ioq.ioq_wpe);
819  		struct svc_req *req = rec->svc_req;
820  		SVCXPRT *xprt = &rec->xprt;
821  		enum xprt_stat stat;
822  	
823  		/* Resume the request. */
824  		stat  = req->rq_xprt->xp_resume_cb(req);
825  	
826  		if (stat == XPRT_SUSPEND) {
827  			/* The rquest is suspended, don't touch the request in any way
828  			 * because the resume may already be scheduled and running on
829  			 * another thread.
830  			 */
831  			return;
832  		}
833  	
834  		if (req->rq_auth)
835  			SVCAUTH_RELEASE(req);
836  	
837  		XDR_DESTROY(req->rq_xdrs);
838  	
839  		__svc_params->free_cb(req, stat);
840  	
841  		SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
842  	}
843  	
844  	void svc_resume(struct svc_req *req)
845  	{
846  		struct rpc_dplx_rec *rpc_dplx_rec = REC_XPRT(req->rq_xprt);
847  	
848  		rpc_dplx_rec->ioq.ioq_wpe.fun = svc_resume_task;
849  		work_pool_submit(&svc_work_pool, &(rpc_dplx_rec->ioq.ioq_wpe));
850  	}
851  	
852  	/*
853  	 * Like __svc_clean_idle but event-type independent.  For now no cleanfds.
854  	 */
855  	
856  	struct svc_rqst_clean_arg {
857  		struct timespec ts;
858  		int timeout;
859  		int cleaned;
860  	};
861  	
862  	static bool
863  	svc_rqst_clean_func(SVCXPRT *xprt, void *arg)
864  	{
865  		struct svc_rqst_clean_arg *acc = (struct svc_rqst_clean_arg *)arg;
866  	
867  		if (xprt->xp_ops == NULL)
868  			return (false);
869  	
870  		if (xprt->xp_flags & (SVC_XPRT_FLAG_DESTROYED | SVC_XPRT_FLAG_UREG))
871  			return (false);
872  	
873  		if ((acc->ts.tv_sec - REC_XPRT(xprt)->recv.ts.tv_sec) < acc->timeout)
874  			return (false);
875  	
876  		SVC_DESTROY(xprt);
877  		acc->cleaned++;
878  		return (true);
879  	}
880  	
881  	void authgss_ctx_gc_idle(void);
882  	
883  	static void
884  	svc_rqst_clean_idle(int timeout)
885  	{
886  		struct svc_rqst_clean_arg acc;
887  		static mutex_t active_mtx = MUTEX_INITIALIZER;
888  		static uint32_t active;
889  	
890  		if (mutex_trylock(&active_mtx) != 0)
891  			return;
892  	
893  		if (active > 0)
894  			goto unlock;
895  	
896  		++active;
897  	
898  	#ifdef _HAVE_GSSAPI
899  		/* trim gss context cache */
900  		authgss_ctx_gc_idle();
901  	#endif /* _HAVE_GSSAPI */
902  	
903  		if (timeout <= 0)
904  			goto unlock;
905  	
906  		/* trim xprts (not sorted, not aggressive [but self limiting]) */
907  		(void)clock_gettime(CLOCK_MONOTONIC_FAST, &acc.ts);
908  		acc.timeout = timeout;
909  		acc.cleaned = 0;
910  	
911  		svc_xprt_foreach(svc_rqst_clean_func, (void *)&acc);
912  	
913  	 unlock:
914  		--active;
915  		mutex_unlock(&active_mtx);
916  		return;
917  	}
918  	
919  	#ifdef TIRPC_EPOLL
920  	
921  	static struct rpc_dplx_rec *
922  	svc_rqst_epoll_event(struct svc_rqst_rec *sr_rec, struct epoll_event *ev)
923  	{
924  		struct rpc_dplx_rec *rec = (struct rpc_dplx_rec *) ev->data.ptr;
925  		uint16_t xp_flags;
926  	
927  		if (unlikely(ev->data.fd == sr_rec->sv[1])) {
928  			/* signalled -- there was a wakeup on ctrl_ev (see
929  			 * top-of-loop) */
930  			__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
931  				"%s: fd %d wakeup (sr_rec %p)",
932  				__func__, sr_rec->sv[1],
933  				sr_rec);
934  			(void)consume_ev_sig_nb(sr_rec->sv[1]);
935  			__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
936  				"%s: fd %d after consume sig (sr_rec %p)",
937  				__func__, sr_rec->sv[1],
938  				sr_rec);
939  			return (NULL);
940  		}
941  	
942  		/* Another task may release transport in parallel.
943  		 * We have a ref from being in epoll, but since epoll is one-shot, a new ref
944  		 * will be taken when we re-enter epoll.  Use this ref for the processor
945  		 * without taking another one.
946  		 */
947  	
948  		/* MUST handle flags after reference.
949  		 * Although another task may unhook, the error is non-fatal.
950  		 */
951  		xp_flags = atomic_postclear_uint16_t_bits(&rec->xprt.xp_flags,
952  							  SVC_XPRT_FLAG_ADDED);
953  	
954  		__warnx(TIRPC_DEBUG_FLAG_SVC_RQST |
955  			TIRPC_DEBUG_FLAG_REFCNT,
956  			"%s: %p fd %d xp_refcnt %" PRId32
957  			" event %d",
958  			__func__, rec, rec->xprt.xp_fd, rec->xprt.xp_refcnt,
959  			ev->events);
960  	
961  		if (rec->xprt.xp_refcnt > 1
962  		 && (xp_flags & SVC_XPRT_FLAG_ADDED)
963  		 && !(xp_flags & SVC_XPRT_FLAG_DESTROYED)
964  		 && !(atomic_postset_uint16_t_bits(&rec->ioq.ioq_s.qflags,
965  						   IOQ_FLAG_WORKING)
966  		      & IOQ_FLAG_WORKING)) {
967  			/* (idempotent) xp_flags and xp_refcnt are set atomic.
968  			 * xp_refcnt need more than 1 (this event).
969  			 */
970  			return (rec);
971  		}
972  	
973  		/* Do not return destroyed transports.
974  		 * Probably log non-fatal "WARNING! already destroying!"
975  		 */
976  		SVC_RELEASE(&rec->xprt, SVC_RELEASE_FLAG_NONE);
977  		return (NULL);
978  	}
979  	
980  	/*
981  	 * not locked
982  	 */
983  	static inline struct rpc_dplx_rec *
984  	svc_rqst_epoll_events(struct svc_rqst_rec *sr_rec, int n_events)
985  	{
986  		struct rpc_dplx_rec *rec = NULL;
987  		int ix = 0;
988  	
989  		while (ix < n_events) {
990  			rec = svc_rqst_epoll_event(sr_rec,
991  						   &(sr_rec->ev_u.epoll.events[ix++]));
992  			if (rec)
993  				break;
994  		}
995  	
996  		if (!rec) {
997  			/* continue waiting for events with this task */
998  			return NULL;
999  		}
1000 	
1001 		while (ix < n_events) {
1002 			struct rpc_dplx_rec *rec = svc_rqst_epoll_event(sr_rec,
1003 						    &(sr_rec->ev_u.epoll.events[ix++]));
1004 			if (!rec)
1005 				continue;
1006 	
1007 			rec->ioq.ioq_wpe.fun = svc_rqst_xprt_task;
1008 			work_pool_submit(&svc_work_pool, &(rec->ioq.ioq_wpe));
1009 		}
1010 	
1011 		/* submit another task to handle events in order */
1012 		atomic_inc_int32_t(&sr_rec->ev_refcnt);
1013 		work_pool_submit(&svc_work_pool, &sr_rec->ev_wpe);
1014 	
1015 		return rec;
1016 	}
1017 	
1018 	static void svc_rqst_epoll_loop(struct work_pool_entry *wpe)
1019 	{
1020 		struct svc_rqst_rec *sr_rec = 
1021 			opr_containerof(wpe, struct svc_rqst_rec, ev_wpe);
1022 		struct clnt_req *cc;
1023 		struct opr_rbtree_node *n;
1024 		struct timespec ts;
1025 		int timeout_ms;
1026 		int expire_ms;
1027 		int n_events;
1028 		bool finished;
1029 	
1030 		for (;;) {
1031 			timeout_ms = SVC_RQST_TIMEOUT_MS;
1032 	
1033 			/* coarse nsec, not system time */
1034 			(void)clock_gettime(CLOCK_MONOTONIC_FAST, &ts);
1035 			expire_ms = timespec_ms(&ts);
1036 	
1037 			/* before epoll_wait will accumulate events during scan */
1038 			mutex_lock(&sr_rec->ev_lock);
1039 			while ((n = opr_rbtree_first(&sr_rec->call_expires))) {
1040 				cc = opr_containerof(n, struct clnt_req, cc_rqst);
1041 	
1042 				if (cc->cc_expire_ms > expire_ms) {
1043 					timeout_ms = cc->cc_expire_ms - expire_ms;
1044 					break;
1045 				}
1046 	
1047 				/* order dependent */
1048 				atomic_clear_uint16_t_bits(&cc->cc_flags,
1049 							   CLNT_REQ_FLAG_EXPIRING);
1050 				opr_rbtree_remove(&sr_rec->call_expires, &cc->cc_rqst);
1051 				cc->cc_expire_ms = 0;	/* atomic barrier(s) */
1052 	
1053 				atomic_inc_uint32_t(&cc->cc_refcnt);
1054 				cc->cc_wpe.fun = svc_rqst_expire_task;
1055 				cc->cc_wpe.arg = NULL;
1056 				work_pool_submit(&svc_work_pool, &cc->cc_wpe);
1057 			}
1058 			mutex_unlock(&sr_rec->ev_lock);
1059 	
1060 			__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
1061 				"%s: epoll_fd %d before epoll_wait (%d)",
1062 				__func__,
1063 				sr_rec->ev_u.epoll.epoll_fd,
1064 				timeout_ms);
1065 	
1066 			n_events = epoll_wait(sr_rec->ev_u.epoll.epoll_fd,
1067 					      sr_rec->ev_u.epoll.events,
1068 					      sr_rec->ev_u.epoll.max_events,
1069 					      timeout_ms);
1070 	
1071 			if (unlikely(sr_rec->ev_flags & SVC_RQST_FLAG_SHUTDOWN)) {
1072 				__warnx(TIRPC_DEBUG_FLAG_SVC_RQST,
1073 					"%s: epoll_fd %d epoll_wait shutdown (%d)",
1074 					__func__,
1075 					sr_rec->ev_u.epoll.epoll_fd,
1076 					n_events);
1077 				finished = true;
1078 				break;
1079 			}
1080 			if (n_events > 0) {
1081 				atomic_add_uint32_t(&wakeups, n_events);
1082 				struct rpc_dplx_rec *rec;
1083 	
1084 				rec = svc_rqst_epoll_events(sr_rec, n_events);
1085 	
1086 				if (rec != NULL) {
1087 					/* use this hot thread for the first event */
1088 					rec->ioq.ioq_wpe.fun = svc_rqst_xprt_task;
1089 					svc_rqst_xprt_task(&(rec->ioq.ioq_wpe));
1090 	
1091 					/* failsafe idle processing after work task */
1092 					if (atomic_postclear_uint32_t_bits(
1093 						&wakeups, ~SVC_RQST_WAKEUPS)
1094 					    > SVC_RQST_WAKEUPS) {
1095 						svc_rqst_clean_idle(
1096 							__svc_params->idle_timeout);
1097 					}
1098 					finished = false;
1099 					break;
1100 				}
1101 				continue;
1102 			}
1103 			if (!n_events) {
1104 				/* timed out (idle) */
1105 				atomic_inc_uint32_t(&wakeups);
1106 				continue;
1107 			}
1108 			n_events = errno;
1109 			if (n_events != EINTR) {
1110 				__warnx(TIRPC_DEBUG_FLAG_WARN,
1111 					"%s: epoll_fd %d epoll_wait failed (%d)",
1112 					__func__,
1113 					sr_rec->ev_u.epoll.epoll_fd,
1114 					n_events);
1115 				finished = true;
1116 				break;
1117 			}
1118 		}
1119 		if (finished) {
1120 			close(sr_rec->ev_u.epoll.epoll_fd);
1121 			mem_free(sr_rec->ev_u.epoll.events,
1122 				 sr_rec->ev_u.epoll.max_events *
1123 				 sizeof(struct epoll_event));
1124 		}
1125 	
1126 		svc_complete_task(sr_rec, finished);
1127 	}
1128 	#endif
1129 	
1130 	static void svc_complete_task(struct svc_rqst_rec *sr_rec, bool finished)
1131 	{
1132 		if (finished) {
1133 			/* reference count here should be 2:
1134 			 *	1	svc_rqst_set
1135 			 *	+1	this work_pool thread
1136 			 * so, DROP one here so the final release will go to 0.
1137 			 */
1138 			atomic_dec_int32_t(&sr_rec->ev_refcnt);	/* svc_rqst_set */
1139 		}
1140 		svc_rqst_release(sr_rec);
1141 	}
1142 	
1143 	/*
1144 	 * No locking, "there can be only one"
1145 	 */
1146 	static void
1147 	svc_rqst_run_task(struct work_pool_entry *wpe)
1148 	{
1149 		struct svc_rqst_rec *sr_rec =
1150 			opr_containerof(wpe, struct svc_rqst_rec, ev_wpe);
1151 	
1152 		/* enter event loop */
1153 		switch (sr_rec->ev_type) {
1154 		default:
1155 			/* XXX formerly select/fd_set case, now placeholder for new
1156 			 * event systems, reworked select, etc. */
1157 			__warnx(TIRPC_DEBUG_FLAG_ERROR,
1158 				"%s: unsupported event type",
1159 				__func__);
1160 			break;
1161 		}			/* switch */
1162 	
1163 		svc_complete_task(sr_rec, true);
1164 	}
1165 	
1166 	int
1167 	svc_rqst_thrd_signal(uint32_t chan_id, uint32_t flags)
1168 	{
1169 		struct svc_rqst_rec *sr_rec;
1170 	
1171 		sr_rec = svc_rqst_lookup_chan(chan_id);
1172 		if (!sr_rec) {
1173 			__warnx(TIRPC_DEBUG_FLAG_ERROR,
1174 				"%s: unknown evchan %d",
1175 				__func__, chan_id);
1176 			return (ENOENT);
1177 		}
1178 	
1179 		ev_sig(sr_rec->sv[0], flags);	/* send wakeup */
1180 	
1181 		__warnx(TIRPC_DEBUG_FLAG_ERROR,
1182 			"%s: signalled evchan %d",
1183 			__func__, chan_id);
1184 		svc_rqst_release(sr_rec);
1185 		return (0);
1186 	}
1187 	
1188 	static int
1189 	svc_rqst_delete_evchan(uint32_t chan_id)
1190 	{
1191 		struct svc_rqst_rec *sr_rec;
1192 		int code = 0;
1193 	
1194 		sr_rec = svc_rqst_lookup_chan(chan_id);
1195 		if (!sr_rec) {
1196 			return (ENOENT);
1197 		}
1198 		atomic_set_uint16_t_bits(&sr_rec->ev_flags, SVC_RQST_FLAG_SHUTDOWN);
1199 		ev_sig(sr_rec->sv[0], SVC_RQST_FLAG_SHUTDOWN);
1200 	
1201 		svc_rqst_release(sr_rec);
1202 		return (code);
1203 	}
1204 	
1205 	void
1206 	svc_rqst_shutdown(void)
1207 	{
1208 		uint32_t channels = svc_rqst_set.max_id;
1209 	
1210 		while (channels > 0) {
1211 			svc_rqst_delete_evchan(--channels);
1212 		}
1213 	}
1214