1 /*
2 * Copyright (c) 2013-2015 CohortFS, LLC.
3 * Copyright (c) 2013-2018 Red Hat, Inc. and/or its affiliates.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 */
26
27 /**
28 * @file work_pool.c
29 * @author William Allen Simpson <bill@cohortfs.com>
30 * @brief Pthreads-based work queue package
31 *
32 * @section DESCRIPTION
33 *
34 * This provides simple work queues using pthreads and TAILQ primitives.
35 *
36 * @note Loosely based upon previous thrdpool by
37 * Matt Benjamin <matt@cohortfs.com>
38 */
39
40 #include "config.h"
41
42 #include <sys/types.h>
43 #if !defined(_WIN32)
44 #include <netinet/in.h>
45 #include <err.h>
46 #endif
47
48 #include <rpc/types.h>
49 #include "rpc_com.h"
50 #include <sys/types.h>
51 #include <misc/abstract_atomic.h>
52 #include <misc/portable.h>
53 #include <stddef.h>
54 #include <stdlib.h>
55 #include <string.h>
56 #include <errno.h>
57 #include <intrinsic.h>
58 #include <urcu-bp.h>
59
60 #include <rpc/work_pool.h>
61
62 #define WORK_POOL_STACK_SIZE MAX(1 * 1024 * 1024, PTHREAD_STACK_MIN)
63 #define WORK_POOL_TIMEOUT_MS (31 /* seconds (prime) */ * 1000)
64
65 /* forward declaration in lieu of moving code, was inline */
66
67 static int work_pool_spawn(struct work_pool *pool);
68
69 int
70 work_pool_init(struct work_pool *pool, const char *name,
71 struct work_pool_params *params)
72 {
73 int rc;
74
75 memset(pool, 0, sizeof(*pool));
76 poolq_head_setup(&pool->pqh);
77 TAILQ_INIT(&pool->wptqh);
78
79 pool->timeout_ms = WORK_POOL_TIMEOUT_MS;
80
81 pool->name = mem_strdup(name);
82 pool->params = *params;
83
84 if (pool->params.thrd_min < 1) {
85 __warnx(TIRPC_DEBUG_FLAG_ERROR,
86 "%s() thrd_min (%d) < 1",
87 __func__, pool->params.thrd_min);
88 pool->params.thrd_min = 1;
89 };
90
91 if (pool->params.thrd_max < pool->params.thrd_min) {
92 __warnx(TIRPC_DEBUG_FLAG_ERROR,
93 "%s() thrd_max (%d) < thrd_min (%d)",
94 __func__, pool->params.thrd_max, pool->params.thrd_min);
95 pool->params.thrd_max = pool->params.thrd_min;
96 };
97
98 rc = pthread_attr_init(&pool->attr);
99 if (rc) {
100 __warnx(TIRPC_DEBUG_FLAG_ERROR,
101 "%s() can't init pthread's attributes: %s (%d)",
102 __func__, strerror(rc), rc);
103 return rc;
104 }
105
106 rc = pthread_attr_setscope(&pool->attr, PTHREAD_SCOPE_SYSTEM);
107 if (rc) {
108 __warnx(TIRPC_DEBUG_FLAG_ERROR,
109 "%s() can't set pthread's scope: %s (%d)",
110 __func__, strerror(rc), rc);
111 return rc;
112 }
113
114 rc = pthread_attr_setdetachstate(&pool->attr, PTHREAD_CREATE_DETACHED);
115 if (rc) {
116 __warnx(TIRPC_DEBUG_FLAG_ERROR,
117 "%s() can't set pthread's join state: %s (%d)",
118 __func__, strerror(rc), rc);
119 return rc;
120 }
121
122 rc = pthread_attr_setstacksize(&pool->attr, WORK_POOL_STACK_SIZE);
123 if (rc) {
124 __warnx(TIRPC_DEBUG_FLAG_ERROR,
125 "%s() can't set pthread's stack size: %s (%d)",
126 __func__, strerror(rc), rc);
127 }
128
129 /* initial spawn will spawn more threads as needed */
130 pool->n_threads = 1;
131 return work_pool_spawn(pool);
132 }
133
134 /**
135 * @brief The worker thread
136 *
137 * This is the body of the worker thread. The argument is a pointer to
138 * its working context, kept in a list for each pool.
139 *
140 * @param[in] arg thread context
141 */
142
143 static void *
144 work_pool_thread(void *arg)
145 {
146 struct work_pool_thread *wpt = arg;
147 struct work_pool *pool = wpt->pool;
148 struct poolq_entry *have;
149 struct timespec ts;
150 int rc;
151 bool spawn;
152
153 rcu_register_thread();
154
155 pthread_cond_init(&wpt->pqcond, NULL);
156 pthread_mutex_lock(&pool->pqh.qmutex);
157 TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq);
158
159 wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);
160 snprintf(wpt->worker_name, sizeof(wpt->worker_name), "%.5s%" PRIu32,
161 pool->name, wpt->worker_index);
162 __ntirpc_pkg_params.thread_name_(wpt->worker_name);
163
164 do {
165 /* testing at top of loop allows pre-specification of work,
166 * and thread termination after timeout with no work (below).
167 */
168 if (wpt->work) {
169 wpt->work->wpt = wpt;
170 spawn = pool->pqh.qcount < pool->params.thrd_min
171 && pool->n_threads < pool->params.thrd_max;
172 if (spawn)
173 pool->n_threads++;
174 pthread_mutex_unlock(&pool->pqh.qmutex);
175
176 if (spawn) {
177 /* busy, so dynamically add another thread */
178 (void)work_pool_spawn(pool);
179 }
180
181 __warnx(TIRPC_DEBUG_FLAG_WORKER,
182 "%s() %s task %p",
183 __func__, wpt->worker_name, wpt->work);
184 wpt->work->fun(wpt->work);
185 wpt->work = NULL;
186 pthread_mutex_lock(&pool->pqh.qmutex);
187 }
188
189 if (0 > pool->pqh.qcount++) {
190 /* negative for task(s) */
191 have = TAILQ_FIRST(&pool->pqh.qh);
192 TAILQ_REMOVE(&pool->pqh.qh, have, q);
193
194 wpt->work = (struct work_pool_entry *)have;
195 continue;
196 }
197
198 /* positive for waiting worker(s):
199 * use the otherwise empty pool to hold them,
200 * simplifying mutex and pointer setup.
201 */
202 TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);
203
204 __warnx(TIRPC_DEBUG_FLAG_WORKER,
205 "%s() %s waiting",
206 __func__, wpt->worker_name);
207
208 clock_gettime(CLOCK_REALTIME_FAST, &ts);
209 timespec_addms(&ts, pool->timeout_ms);
210
211 /* Note: the mutex is the pool _head,
212 * but the condition is per worker,
213 * making the signal efficient!
214 */
215 rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
216 &ts);
217 if (!wpt->work) {
218 /* Allow for possible timing race:
219 * work entry can be submitted by another
220 * thread during the thread task switch
221 * after shutdown or timeout?
222 * Then, has already been removed there.
223 */
224 pool->pqh.qcount--;
225 TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
226 }
227 if (rc && rc != ETIMEDOUT) {
228 __warnx(TIRPC_DEBUG_FLAG_ERROR,
229 "%s() cond_timedwait failed (%d)\n",
230 __func__, rc);
231 break;
232 }
233 } while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
234
235 pool->n_threads--;
236 TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
237 pthread_mutex_unlock(&pool->pqh.qmutex);
238
239 __warnx(TIRPC_DEBUG_FLAG_WORKER,
240 "%s() %s terminating",
241 __func__, wpt->worker_name);
242 cond_destroy(&wpt->pqcond);
243 mem_free(wpt, sizeof(*wpt));
244 rcu_unregister_thread();
245
246 return (NULL);
247 }
248
249 static int
250 work_pool_spawn(struct work_pool *pool)
251 {
252 int rc;
253 struct work_pool_thread *wpt = mem_zalloc(sizeof(*wpt));
254
255 wpt->pool = pool;
256
257 rc = pthread_create(&wpt->pt, &pool->attr, work_pool_thread, wpt);
258 if (rc) {
259 __warnx(TIRPC_DEBUG_FLAG_ERROR,
260 "%s() pthread_create failed (%d)\n",
261 __func__, rc);
262 return rc;
263 }
264
265 return (0);
266 }
267
268 int
269 work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
270 {
271 int rc = 0;
272
273 if (unlikely(!pool->params.thrd_max)) {
274 /* queue is draining */
275 return (0);
276 }
277 pthread_mutex_lock(&pool->pqh.qmutex);
278
279 if (0 < pool->pqh.qcount--) {
280 struct work_pool_thread *wpt = (struct work_pool_thread *)
281 TAILQ_FIRST(&pool->pqh.qh);
282
283 /* positive for waiting worker(s) */
284 TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
285 wpt->work = work;
286
287 /* Note: the mutex is the pool _head,
288 * but the condition is per worker,
289 * making the signal efficient!
290 */
291 pthread_cond_signal(&wpt->pqcond);
292 } else {
293 /* negative for task(s) */
294 TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
295 }
296
297 pthread_mutex_unlock(&pool->pqh.qmutex);
298 return rc;
299 }
300
301 int
302 work_pool_shutdown(struct work_pool *pool)
303 {
304 struct work_pool_thread *wpt;
305 struct timespec ts = {
306 .tv_sec = 0,
307 .tv_nsec = 3000,
308 };
309
310 pthread_mutex_lock(&pool->pqh.qmutex);
311 pool->timeout_ms = 1;
312 pool->params.thrd_max =
313 pool->params.thrd_min = 0;
314
315 wpt = TAILQ_FIRST(&pool->wptqh);
316 while (wpt) {
317 pthread_cond_signal(&wpt->pqcond);
318 wpt = TAILQ_NEXT(wpt, wptq);
319 }
320
321 while (pool->n_threads > 0) {
322 pthread_mutex_unlock(&pool->pqh.qmutex);
323 __warnx(TIRPC_DEBUG_FLAG_WORKER,
324 "%s() \"%s\" %" PRIu32,
325 __func__, pool->name, pool->n_threads);
326 nanosleep(&ts, NULL);
327 pthread_mutex_lock(&pool->pqh.qmutex);
328 }
329 pthread_mutex_unlock(&pool->pqh.qmutex);
330
331 mem_free(pool->name, 0);
332 poolq_head_destroy(&pool->pqh);
333
334 return (0);
335 }
336