1    	/*
2    	 * Copyright (c) 2013-2015 CohortFS, LLC.
3    	 * Copyright (c) 2013-2018 Red Hat, Inc. and/or its affiliates.
4    	 * All rights reserved.
5    	 *
6    	 * Redistribution and use in source and binary forms, with or without
7    	 * modification, are permitted provided that the following conditions
8    	 * are met:
9    	 * 1. Redistributions of source code must retain the above copyright
10   	 *    notice, this list of conditions and the following disclaimer.
11   	 * 2. Redistributions in binary form must reproduce the above copyright
12   	 *    notice, this list of conditions and the following disclaimer in the
13   	 *    documentation and/or other materials provided with the distribution.
14   	 *
15   	 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
16   	 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17   	 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18   	 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19   	 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20   	 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21   	 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22   	 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23   	 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24   	 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25   	 */
26   	
27   	/**
28   	 * @file work_pool.c
29   	 * @author William Allen Simpson <bill@cohortfs.com>
30   	 * @brief Pthreads-based work queue package
31   	 *
32   	 * @section DESCRIPTION
33   	 *
34   	 * This provides simple work queues using pthreads and TAILQ primitives.
35   	 *
36   	 * @note    Loosely based upon previous thrdpool by
37   	 *          Matt Benjamin <matt@cohortfs.com>
38   	 */
39   	
40   	#include "config.h"
41   	
42   	#include <sys/types.h>
43   	#if !defined(_WIN32)
44   	#include <netinet/in.h>
45   	#include <err.h>
46   	#endif
47   	
48   	#include <rpc/types.h>
49   	#include "rpc_com.h"
50   	#include <sys/types.h>
51   	#include <misc/abstract_atomic.h>
52   	#include <misc/portable.h>
53   	#include <stddef.h>
54   	#include <stdlib.h>
55   	#include <string.h>
56   	#include <errno.h>
57   	#include <intrinsic.h>
58   	#include <urcu-bp.h>
59   	
60   	#include <rpc/work_pool.h>
61   	
62   	#define WORK_POOL_STACK_SIZE MAX(1 * 1024 * 1024, PTHREAD_STACK_MIN)
63   	#define WORK_POOL_TIMEOUT_MS (31 /* seconds (prime) */ * 1000)
64   	
65   	/* forward declaration in lieu of moving code, was inline */
66   	
67   	static int work_pool_spawn(struct work_pool *pool);
68   	
69   	int
70   	work_pool_init(struct work_pool *pool, const char *name,
71   			struct work_pool_params *params)
72   	{
73   		int rc;
74   	
75   		memset(pool, 0, sizeof(*pool));
76   		poolq_head_setup(&pool->pqh);
77   		TAILQ_INIT(&pool->wptqh);
78   	
(1) Event missing_lock: Accessing "pool->timeout_ms" without holding lock "poolq_head.qmutex". Elsewhere, "work_pool.timeout_ms" is accessed with "poolq_head.qmutex" held 1 out of 2 times (1 of these accesses strongly imply that it is necessary).
Also see events: [example_lock][example_access]
79   		pool->timeout_ms = WORK_POOL_TIMEOUT_MS;
80   	
81   		pool->name = mem_strdup(name);
82   		pool->params = *params;
83   	
84   		if (pool->params.thrd_min < 1) {
85   			__warnx(TIRPC_DEBUG_FLAG_ERROR,
86   				"%s() thrd_min (%d) < 1",
87   				__func__, pool->params.thrd_min);
88   			pool->params.thrd_min = 1;
89   		};
90   	
91   		if (pool->params.thrd_max < pool->params.thrd_min) {
92   			__warnx(TIRPC_DEBUG_FLAG_ERROR,
93   				"%s() thrd_max (%d) < thrd_min (%d)",
94   				__func__, pool->params.thrd_max, pool->params.thrd_min);
95   			pool->params.thrd_max = pool->params.thrd_min;
96   		};
97   	
98   		rc = pthread_attr_init(&pool->attr);
99   		if (rc) {
100  			__warnx(TIRPC_DEBUG_FLAG_ERROR,
101  				"%s() can't init pthread's attributes: %s (%d)",
102  				__func__, strerror(rc), rc);
103  			return rc;
104  		}
105  	
106  		rc = pthread_attr_setscope(&pool->attr, PTHREAD_SCOPE_SYSTEM);
107  		if (rc) {
108  			__warnx(TIRPC_DEBUG_FLAG_ERROR,
109  				"%s() can't set pthread's scope: %s (%d)",
110  				__func__, strerror(rc), rc);
111  			return rc;
112  		}
113  	
114  		rc = pthread_attr_setdetachstate(&pool->attr, PTHREAD_CREATE_DETACHED);
115  		if (rc) {
116  			__warnx(TIRPC_DEBUG_FLAG_ERROR,
117  				"%s() can't set pthread's join state: %s (%d)",
118  				__func__, strerror(rc), rc);
119  			return rc;
120  		}
121  	
122  		rc = pthread_attr_setstacksize(&pool->attr, WORK_POOL_STACK_SIZE);
123  		if (rc) {
124  			__warnx(TIRPC_DEBUG_FLAG_ERROR,
125  				"%s() can't set pthread's stack size: %s (%d)",
126  				__func__, strerror(rc), rc);
127  		}
128  	
129  		/* initial spawn will spawn more threads as needed */
130  		pool->n_threads = 1;
131  		return work_pool_spawn(pool);
132  	}
133  	
134  	/**
135  	 * @brief The worker thread
136  	 *
137  	 * This is the body of the worker thread. The argument is a pointer to
138  	 * its working context, kept in a list for each pool.
139  	 *
140  	 * @param[in] arg 	thread context
141  	 */
142  	
143  	static void *
144  	work_pool_thread(void *arg)
145  	{
146  		struct work_pool_thread *wpt = arg;
147  		struct work_pool *pool = wpt->pool;
148  		struct poolq_entry *have;
149  		struct timespec ts;
150  		int rc;
151  		bool spawn;
152  	
153  		rcu_register_thread();
154  	
155  		pthread_cond_init(&wpt->pqcond, NULL);
156  		pthread_mutex_lock(&pool->pqh.qmutex);
157  		TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq);
158  	
159  		wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);
160  		snprintf(wpt->worker_name, sizeof(wpt->worker_name), "%.5s%" PRIu32,
161  			 pool->name, wpt->worker_index);
162  		__ntirpc_pkg_params.thread_name_(wpt->worker_name);
163  	
164  		do {
165  			/* testing at top of loop allows pre-specification of work,
166  			 * and thread termination after timeout with no work (below).
167  			 */
168  			if (wpt->work) {
169  				wpt->work->wpt = wpt;
170  				spawn = pool->pqh.qcount < pool->params.thrd_min
171  				      && pool->n_threads < pool->params.thrd_max;
172  				if (spawn)
173  					pool->n_threads++;
174  				pthread_mutex_unlock(&pool->pqh.qmutex);
175  	
176  				if (spawn) {
177  					/* busy, so dynamically add another thread */
178  					(void)work_pool_spawn(pool);
179  				}
180  	
181  				__warnx(TIRPC_DEBUG_FLAG_WORKER,
182  					"%s() %s task %p",
183  					__func__, wpt->worker_name, wpt->work);
184  				wpt->work->fun(wpt->work);
185  				wpt->work = NULL;
186  				pthread_mutex_lock(&pool->pqh.qmutex);
187  			}
188  	
189  			if (0 > pool->pqh.qcount++) {
190  				/* negative for task(s) */
191  				have = TAILQ_FIRST(&pool->pqh.qh);
192  				TAILQ_REMOVE(&pool->pqh.qh, have, q);
193  	
194  				wpt->work = (struct work_pool_entry *)have;
195  				continue;
196  			}
197  	
198  			/* positive for waiting worker(s):
199  			 * use the otherwise empty pool to hold them,
200  			 * simplifying mutex and pointer setup.
201  			 */
202  			TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);
203  	
204  			__warnx(TIRPC_DEBUG_FLAG_WORKER,
205  				"%s() %s waiting",
206  				__func__, wpt->worker_name);
207  	
208  			clock_gettime(CLOCK_REALTIME_FAST, &ts);
209  			timespec_addms(&ts, pool->timeout_ms);
210  	
211  			/* Note: the mutex is the pool _head,
212  			 * but the condition is per worker,
213  			 * making the signal efficient!
214  			 */
215  			rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
216  						    &ts);
217  			if (!wpt->work) {
218  				/* Allow for possible timing race:
219  				 * work entry can be submitted by another
220  				 * thread during the thread task switch
221  				 * after shutdown or timeout?
222  				 * Then, has already been removed there.
223  				 */
224  				pool->pqh.qcount--;
225  				TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
226  			}
227  			if (rc && rc != ETIMEDOUT) {
228  				__warnx(TIRPC_DEBUG_FLAG_ERROR,
229  					"%s() cond_timedwait failed (%d)\n",
230  					__func__, rc);
231  				break;
232  			}
233  		} while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
234  	
235  		pool->n_threads--;
236  		TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
237  		pthread_mutex_unlock(&pool->pqh.qmutex);
238  	
239  		__warnx(TIRPC_DEBUG_FLAG_WORKER,
240  			"%s() %s terminating",
241  			__func__, wpt->worker_name);
242  		cond_destroy(&wpt->pqcond);
243  		mem_free(wpt, sizeof(*wpt));
244  		rcu_unregister_thread();
245  	
246  		return (NULL);
247  	}
248  	
249  	static int
250  	work_pool_spawn(struct work_pool *pool)
251  	{
252  		int rc;
253  		struct work_pool_thread *wpt = mem_zalloc(sizeof(*wpt));
254  	
255  		wpt->pool = pool;
256  	
257  		rc = pthread_create(&wpt->pt, &pool->attr, work_pool_thread, wpt);
258  		if (rc) {
259  			__warnx(TIRPC_DEBUG_FLAG_ERROR,
260  				"%s() pthread_create failed (%d)\n",
261  				__func__, rc);
262  			return rc;
263  		}
264  	
265  		return (0);
266  	}
267  	
268  	int
269  	work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
270  	{
271  		int rc = 0;
272  	
273  		if (unlikely(!pool->params.thrd_max)) {
274  			/* queue is draining */
275  			return (0);
276  		}
277  		pthread_mutex_lock(&pool->pqh.qmutex);
278  	
279  		if (0 < pool->pqh.qcount--) {
280  			struct work_pool_thread *wpt = (struct work_pool_thread *)
281  				TAILQ_FIRST(&pool->pqh.qh);
282  	
283  			/* positive for waiting worker(s) */
284  			TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
285  			wpt->work = work;
286  	
287  			/* Note: the mutex is the pool _head,
288  			 * but the condition is per worker,
289  			 * making the signal efficient!
290  			 */
291  			pthread_cond_signal(&wpt->pqcond);
292  		} else {
293  			/* negative for task(s) */
294  			TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
295  		}
296  	
297  		pthread_mutex_unlock(&pool->pqh.qmutex);
298  		return rc;
299  	}
300  	
301  	int
302  	work_pool_shutdown(struct work_pool *pool)
303  	{
304  		struct work_pool_thread *wpt;
305  		struct timespec ts = {
306  			.tv_sec = 0,
307  			.tv_nsec = 3000,
308  		};
309  	
(2) Event example_lock: Example 1: Locking "poolq_head.qmutex".
Also see events: [missing_lock][example_access]
310  		pthread_mutex_lock(&pool->pqh.qmutex);
(3) Event example_access: Example 1 (cont.): "work_pool.timeout_ms" is accessed with lock "poolq_head.qmutex" held.
Also see events: [missing_lock][example_lock]
311  		pool->timeout_ms = 1;
312  		pool->params.thrd_max =
313  		pool->params.thrd_min = 0;
314  	
315  		wpt = TAILQ_FIRST(&pool->wptqh);
316  		while (wpt) {
317  			pthread_cond_signal(&wpt->pqcond);
318  			wpt = TAILQ_NEXT(wpt, wptq);
319  		}
320  	
321  		while (pool->n_threads > 0) {
322  			pthread_mutex_unlock(&pool->pqh.qmutex);
323  			__warnx(TIRPC_DEBUG_FLAG_WORKER,
324  				"%s() \"%s\" %" PRIu32,
325  				__func__, pool->name, pool->n_threads);
326  			nanosleep(&ts, NULL);
327  			pthread_mutex_lock(&pool->pqh.qmutex);
328  		}
329  		pthread_mutex_unlock(&pool->pqh.qmutex);
330  	
331  		mem_free(pool->name, 0);
332  		poolq_head_destroy(&pool->pqh);
333  	
334  		return (0);
335  	}
336