1    	/*
2    	 * Copyright (c) 2013 Linux Box Corporation.
3    	 * Copyright (c) 2013-2017 Red Hat, Inc. and/or its affiliates.
4    	 * All rights reserved.
5    	 *
6    	 * Redistribution and use in source and binary forms, with or without
7    	 * modification, are permitted provided that the following conditions
8    	 * are met:
9    	 * 1. Redistributions of source code must retain the above copyright
10   	 *    notice, this list of conditions and the following disclaimer.
11   	 * 2. Redistributions in binary form must reproduce the above copyright
12   	 *    notice, this list of conditions and the following disclaimer in the
13   	 *    documentation and/or other materials provided with the distribution.
14   	 *
15   	 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
16   	 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17   	 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18   	 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19   	 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20   	 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21   	 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22   	 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23   	 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24   	 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25   	 */
26   	
27   	#include "config.h"
28   	#include <sys/cdefs.h>
29   	#include <sys/socket.h>
30   	#include <sys/types.h>
31   	#include <sys/param.h>
32   	#include <sys/poll.h>
33   	
34   	#include <sys/un.h>
35   	#include <sys/time.h>
36   	#include <sys/uio.h>
37   	#include <netinet/in.h>
38   	#include <netinet/tcp.h>
39   	
40   	#include <assert.h>
41   	#include <err.h>
42   	#include <errno.h>
43   	#include <fcntl.h>
44   	#include <stdio.h>
45   	#include <stdlib.h>
46   	#include <string.h>
47   	#include <unistd.h>
48   	#include <signal.h>
49   	#include <misc/timespec.h>
50   	
51   	#include <rpc/types.h>
52   	#include <misc/portable.h>
53   	#include <rpc/rpc.h>
54   	#include <rpc/svc.h>
55   	#include <rpc/svc_auth.h>
56   	
57   	#include <intrinsic.h>
58   	#include "rpc_com.h"
59   	#include "clnt_internal.h"
60   	#include "svc_internal.h"
61   	#include "svc_xprt.h"
62   	#include "rpc_dplx_internal.h"
63   	#include <rpc/svc_rqst.h>
64   	#include <rpc/xdr_ioq.h>
65   	#include <getpeereid.h>
66   	#include <misc/opr.h>
67   	#include "svc_ioq.h"
68   	
69   	/* Send queues, configurable using RPC_Ioq_ThrdMax
70   	 *
71   	 * Ideally, these would be some variant of weighted fair queuing.  Currently,
72   	 * assuming supplied by underlying OS.
73   	 *
74   	 * The assigned thread should have affinity for the interface.  Therefore, the
75   	 * first thread arriving for each interface is used for all subsequent work,
76   	 * until the interface is idle.  This assumes that the output interface is
77   	 * closely associated with the input interface.
78   	 *
79   	 * Note that this is a fixed size list of interfaces.  In most cases,
80   	 * many of these entries will be unused.
81   	 *
82   	 * For efficiency, a mask is applied to the ifindex, possibly causing overlap of
83   	 * multiple interfaces.  The size is selected to be larger than expected number
84   	 * of concurrently active interfaces.  Size must be a power of 2 for mask.
85   	 */
86   	static int num_send_queues; /* must be a power of 2 */
87   	static struct poolq_head *ioq_ifqh;
88   	
89   	static inline int
90   	svc_ioq_mask(int fd)
91   	{
92   		return fd & (num_send_queues - 1); /* num_send_queues is a power of 2 */
93   	}
94   	
95   	void
96   	svc_ioq_init(void)
97   	{
98   		struct poolq_head *ifph;
99   		int i;
100  	
101  		/* We would like to make the number of send queues close to half
102  		 * of the thrd_max. Also, the number of send queues must be a
103  		 * power 2 for quick bitmask hashig!
104  		 */
105  		num_send_queues = 1;
106  		while (num_send_queues * 2 < __svc_params->ioq.thrd_max / 2)
107  			num_send_queues <<= 1;
108  	
109  		ioq_ifqh = mem_calloc(num_send_queues, sizeof(struct poolq_head));
110  		for (i = 0, ifph = &ioq_ifqh[0]; i < num_send_queues; ifph++, i++) {
111  			ifph->qcount = 0;
112  			TAILQ_INIT(&ifph->qh);
113  			mutex_init(&ifph->qmutex, NULL);
114  		}
115  	}
116  	
117  	#define LAST_FRAG ((u_int32_t)(1 << 31))
118  	#define MAXALLOCA (256)
119  	
120  	static inline void
121  	svc_ioq_flushv(SVCXPRT *xprt, struct xdr_ioq *xioq)
122  	{
123  		struct iovec *iov, *tiov, *wiov;
124  		struct poolq_entry *have;
125  		struct xdr_ioq_uv *data;
126  		ssize_t result;
127  		u_int32_t frag_header;
128  		u_int32_t fbytes;
129  		u_int32_t remaining = 0;
130  		u_int32_t vsize = (xioq->ioq_uv.uvqh.qcount + 1) * sizeof(struct iovec);
131  		int iw = 0;
132  		int ix = 1;
133  	
134  		if (unlikely(vsize > MAXALLOCA)) {
135  			iov = mem_alloc(vsize);
136  		} else {
137  			iov = alloca(vsize);
138  		}
139  		wiov = iov; /* position at initial fragment header */
140  	
141  		/* update the most recent data length, just in case */
142  		xdr_tail_update(xioq->xdrs);
143  	
144  		/* build list after initial fragment header (ix = 1 above) */
145  		TAILQ_FOREACH(have, &(xioq->ioq_uv.uvqh.qh), q) {
146  			data = IOQ_(have);
147  			tiov = iov + ix;
148  			tiov->iov_base = data->v.vio_head;
149  			tiov->iov_len = ioquv_length(data);
150  			remaining += tiov->iov_len;
151  			ix++;
152  		}
153  	
154  		while (remaining > 0) {
155  			if (iw == 0) {
156  				/* new fragment header, determine last iov */
157  				fbytes = 0;
158  				for (tiov = &wiov[++iw];
159  				     (tiov < &iov[ix]) && (iw < __svc_maxiov);
160  				     ++tiov, ++iw) {
161  					fbytes += tiov->iov_len;
162  	
163  					/* check for fragment value overflow */
164  					/* never happens, see ganesha FSAL_MAXIOSIZE */
165  					if (unlikely(fbytes >= LAST_FRAG)) {
166  						fbytes -= tiov->iov_len;
167  						break;
168  					}
169  				} /* for */
170  	
171  				/* fragment length doesn't include fragment header */
172  				if (&wiov[iw] < &iov[ix]) {
173  					frag_header = htonl((u_int32_t) (fbytes));
174  				} else {
175  					frag_header = htonl((u_int32_t) (fbytes | LAST_FRAG));
176  				}
177  				wiov->iov_base = &(frag_header);
178  				wiov->iov_len = sizeof(u_int32_t);
179  	
180  				/* writev return includes fragment header */
181  				remaining += sizeof(u_int32_t);
182  				fbytes += sizeof(u_int32_t);
183  			}
184  	
185  			/* blocking write */
186  			result = writev(xprt->xp_fd, wiov, iw);
187  			remaining -= result;
188  	
189  			if (result == fbytes) {
190  				wiov += iw - 1;
191  				iw = 0;
192  				continue;
193  			}
194  			if (unlikely(result < 0)) {
195  				__warnx(TIRPC_DEBUG_FLAG_ERROR,
196  					"%s() writev failed (%d)\n",
197  					__func__, errno);
198  				SVC_DESTROY(xprt);
199  				break;
200  			}
201  			fbytes -= result;
202  	
203  			/* rare? writev underrun? (assume never overrun) */
204  			for (tiov = wiov; iw > 0; ++tiov, --iw) {
205  				if (tiov->iov_len > result) {
206  					tiov->iov_len -= result;
207  					tiov->iov_base += result;
208  					wiov = tiov;
209  					break;
210  				} else {
211  					result -= tiov->iov_len;
212  				}
213  			} /* for */
214  		} /* while */
215  	
216  		if (unlikely(vsize > MAXALLOCA)) {
217  			mem_free(iov, vsize);
218  		}
219  	}
220  	
221  	static void
222  	svc_ioq_write(SVCXPRT *xprt, struct xdr_ioq *xioq, struct poolq_head *ifph)
223  	{
224  		struct poolq_entry *have;
225  	
226  		for (;;) {
227  			/* do i/o unlocked */
228  			if (svc_work_pool.params.thrd_max
229  			 && !(xprt->xp_flags & SVC_XPRT_FLAG_DESTROYED)) {
230  				/* all systems are go! */
231  				svc_ioq_flushv(xprt, xioq);
232  			}
(1) Event unlock: "svc_release_it" unlocks "xprt->xp_lock". [details]
233  			SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
234  			XDR_DESTROY(xioq->xdrs);
235  	
236  			mutex_lock(&ifph->qmutex);
237  			if (--(ifph->qcount) == 0)
238  				break;
239  	
240  			have = TAILQ_FIRST(&ifph->qh);
241  			TAILQ_REMOVE(&ifph->qh, have, q);
242  			mutex_unlock(&ifph->qmutex);
243  	
244  			xioq = _IOQ(have);
245  			xprt = (SVCXPRT *)xioq->xdrs[0].x_lib[1];
246  		}
247  		mutex_unlock(&ifph->qmutex);
248  	}
249  	
250  	static void
251  	svc_ioq_write_callback(struct work_pool_entry *wpe)
252  	{
253  		struct xdr_ioq *xioq = opr_containerof(wpe, struct xdr_ioq, ioq_wpe);
254  		SVCXPRT *xprt = (SVCXPRT *)xioq->xdrs[0].x_lib[1];
255  		struct poolq_head *ifph = &ioq_ifqh[svc_ioq_mask(xprt->xp_fd)];
256  	
257  		svc_ioq_write(xprt, xioq, ifph);
258  	}
259  	
260  	void
261  	svc_ioq_write_now(SVCXPRT *xprt, struct xdr_ioq *xioq)
262  	{
263  		struct poolq_head *ifph = &ioq_ifqh[svc_ioq_mask(xprt->xp_fd)];
264  	
(1) Event unlock: "svc_ref_it" unlocks "xprt->xp_lock". [details]
Also see events: [double_unlock]
265  		SVC_REF(xprt, SVC_REF_FLAG_NONE);
266  		mutex_lock(&ifph->qmutex);
267  	
(2) Event cond_false: Condition "ifph->qcount++ > 0", taking false branch.
268  		if ((ifph->qcount)++ > 0) {
269  			/* queue additional output requests without task switch */
270  			TAILQ_INSERT_TAIL(&ifph->qh, &(xioq->ioq_s), q);
271  			mutex_unlock(&ifph->qmutex);
272  			return;
(3) Event if_end: End of if statement.
273  		}
274  		mutex_unlock(&ifph->qmutex);
275  	
276  		/* handle this output request without queuing, then any additional
277  		 * output requests without a task switch (using this thread).
278  		 */
(4) Event double_unlock: "svc_ioq_write" unlocks "xprt->xp_lock" while it is unlocked. [details]
Also see events: [unlock]
279  		svc_ioq_write(xprt, xioq, ifph);
280  	}
281  	
282  	/*
283  	 * Handle rare case of first output followed by heavy traffic that prevents the
284  	 * original thread from continuing for too long.
285  	 *
286  	 * In the more common case, server traffic will already have begun and this
287  	 * will rapidly queue the output and return.
288  	 */
289  	void
290  	svc_ioq_write_submit(SVCXPRT *xprt, struct xdr_ioq *xioq)
291  	{
292  		struct poolq_head *ifph = &ioq_ifqh[svc_ioq_mask(xprt->xp_fd)];
293  	
294  		SVC_REF(xprt, SVC_REF_FLAG_NONE);
295  		mutex_lock(&ifph->qmutex);
296  	
297  		if ((ifph->qcount)++ > 0) {
298  			/* queue additional output requests, they will be handled by
299  			 * existing thread without another task switch.
300  			 */
301  			TAILQ_INSERT_TAIL(&ifph->qh, &(xioq->ioq_s), q);
302  			mutex_unlock(&ifph->qmutex);
303  			return;
304  		}
305  		mutex_unlock(&ifph->qmutex);
306  	
307  		xioq->ioq_wpe.fun = svc_ioq_write_callback;
308  		work_pool_submit(&svc_work_pool, &xioq->ioq_wpe);
309  	}
310