1 | /* Handle general operations. |
2 | Copyright (C) 1997-2023 Free Software Foundation, Inc. |
3 | This file is part of the GNU C Library. |
4 | |
5 | The GNU C Library is free software; you can redistribute it and/or |
6 | modify it under the terms of the GNU Lesser General Public |
7 | License as published by the Free Software Foundation; either |
8 | version 2.1 of the License, or (at your option) any later version. |
9 | |
10 | The GNU C Library is distributed in the hope that it will be useful, |
11 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
13 | Lesser General Public License for more details. |
14 | |
15 | You should have received a copy of the GNU Lesser General Public |
16 | License along with the GNU C Library; if not, see |
17 | <https://www.gnu.org/licenses/>. */ |
18 | |
19 | #include <aio.h> |
20 | #include <assert.h> |
21 | #include <errno.h> |
22 | #include <limits.h> |
23 | #include <pthreadP.h> |
24 | #include <stdlib.h> |
25 | #include <unistd.h> |
26 | #include <sys/param.h> |
27 | #include <sys/stat.h> |
28 | #include <sys/time.h> |
29 | #include <aio_misc.h> |
30 | |
31 | #if !PTHREAD_IN_LIBC |
32 | /* The available function names differ outside of libc. (In libc, we |
33 | need to use hidden aliases to avoid the PLT.) */ |
34 | # define __pread __libc_pread |
35 | # define __pthread_attr_destroy pthread_attr_destroy |
36 | # define __pthread_attr_init pthread_attr_init |
37 | # define __pthread_attr_setdetachstate pthread_attr_setdetachstate |
38 | # define __pthread_cond_signal pthread_cond_signal |
39 | # define __pthread_cond_timedwait pthread_cond_timedwait |
40 | # define __pthread_getschedparam pthread_getschedparam |
41 | # define __pthread_setschedparam pthread_setschedparam |
42 | # define __pwrite __libc_pwrite |
43 | #endif |
44 | |
45 | #ifndef aio_create_helper_thread |
46 | # define aio_create_helper_thread __aio_create_helper_thread |
47 | |
48 | extern inline int |
49 | __aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg) |
50 | { |
51 | pthread_attr_t attr; |
52 | |
53 | /* Make sure the thread is created detached. */ |
54 | __pthread_attr_init (&attr); |
55 | __pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); |
56 | |
57 | int ret = __pthread_create (threadp, &attr, tf, arg); |
58 | |
59 | __pthread_attr_destroy (&attr); |
60 | return ret; |
61 | } |
62 | #endif |
63 | |
64 | static void add_request_to_runlist (struct requestlist *newrequest); |
65 | |
66 | /* Pool of request list entries. */ |
67 | static struct requestlist **pool; |
68 | |
69 | /* Number of total and allocated pool entries. */ |
70 | static size_t pool_max_size; |
71 | static size_t pool_size; |
72 | |
73 | /* We implement a two dimensional array but allocate each row separately. |
74 | The macro below determines how many entries should be used per row. |
75 | It should better be a power of two. */ |
76 | #define ENTRIES_PER_ROW 32 |
77 | |
78 | /* How many rows we allocate at once. */ |
79 | #define ROWS_STEP 8 |
80 | |
81 | /* List of available entries. */ |
82 | static struct requestlist *freelist; |
83 | |
84 | /* List of request waiting to be processed. */ |
85 | static struct requestlist *runlist; |
86 | |
87 | /* Structure list of all currently processed requests. */ |
88 | static struct requestlist *requests; |
89 | |
90 | /* Number of threads currently running. */ |
91 | static int nthreads; |
92 | |
93 | /* Number of threads waiting for work to arrive. */ |
94 | static int idle_thread_count; |
95 | |
96 | |
97 | /* These are the values used to optimize the use of AIO. The user can |
98 | overwrite them by using the `aio_init' function. */ |
99 | static struct aioinit optim = |
100 | { |
101 | 20, /* int aio_threads; Maximal number of threads. */ |
102 | 64, /* int aio_num; Number of expected simultaneous requests. */ |
103 | 0, |
104 | 0, |
105 | 0, |
106 | 0, |
107 | 1, |
108 | 0 |
109 | }; |
110 | |
111 | |
112 | /* Since the list is global we need a mutex protecting it. */ |
113 | pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; |
114 | |
115 | /* When you add a request to the list and there are idle threads present, |
116 | you signal this condition variable. When a thread finishes work, it waits |
117 | on this condition variable for a time before it actually exits. */ |
118 | pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER; |
119 | |
120 | |
121 | /* Functions to handle request list pool. */ |
122 | static struct requestlist * |
123 | get_elem (void) |
124 | { |
125 | struct requestlist *result; |
126 | |
127 | if (freelist == NULL) |
128 | { |
129 | struct requestlist *new_row; |
130 | int cnt; |
131 | |
132 | assert (sizeof (struct aiocb) == sizeof (struct aiocb64)); |
133 | |
134 | if (pool_size + 1 >= pool_max_size) |
135 | { |
136 | size_t new_max_size = pool_max_size + ROWS_STEP; |
137 | struct requestlist **new_tab; |
138 | |
139 | new_tab = (struct requestlist **) |
140 | realloc (pool, new_max_size * sizeof (struct requestlist *)); |
141 | |
142 | if (new_tab == NULL) |
143 | return NULL; |
144 | |
145 | pool_max_size = new_max_size; |
146 | pool = new_tab; |
147 | } |
148 | |
149 | /* Allocate the new row. */ |
150 | cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW; |
151 | new_row = (struct requestlist *) calloc (cnt, |
152 | sizeof (struct requestlist)); |
153 | if (new_row == NULL) |
154 | return NULL; |
155 | |
156 | pool[pool_size++] = new_row; |
157 | |
158 | /* Put all the new entries in the freelist. */ |
159 | do |
160 | { |
161 | new_row->next_prio = freelist; |
162 | freelist = new_row++; |
163 | } |
164 | while (--cnt > 0); |
165 | } |
166 | |
167 | result = freelist; |
168 | freelist = freelist->next_prio; |
169 | |
170 | return result; |
171 | } |
172 | |
173 | |
174 | void |
175 | __aio_free_request (struct requestlist *elem) |
176 | { |
177 | elem->running = no; |
178 | elem->next_prio = freelist; |
179 | freelist = elem; |
180 | } |
181 | |
182 | |
183 | struct requestlist * |
184 | __aio_find_req (aiocb_union *elem) |
185 | { |
186 | struct requestlist *runp = requests; |
187 | int fildes = elem->aiocb.aio_fildes; |
188 | |
189 | while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes) |
190 | runp = runp->next_fd; |
191 | |
192 | if (runp != NULL) |
193 | { |
194 | if (runp->aiocbp->aiocb.aio_fildes != fildes) |
195 | runp = NULL; |
196 | else |
197 | while (runp != NULL && runp->aiocbp != elem) |
198 | runp = runp->next_prio; |
199 | } |
200 | |
201 | return runp; |
202 | } |
203 | |
204 | |
205 | struct requestlist * |
206 | __aio_find_req_fd (int fildes) |
207 | { |
208 | struct requestlist *runp = requests; |
209 | |
210 | while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes) |
211 | runp = runp->next_fd; |
212 | |
213 | return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes |
214 | ? runp : NULL); |
215 | } |
216 | |
217 | |
218 | void |
219 | __aio_remove_request (struct requestlist *last, struct requestlist *req, |
220 | int all) |
221 | { |
222 | assert (req->running == yes || req->running == queued |
223 | || req->running == done); |
224 | |
225 | if (last != NULL) |
226 | last->next_prio = all ? NULL : req->next_prio; |
227 | else |
228 | { |
229 | if (all || req->next_prio == NULL) |
230 | { |
231 | if (req->last_fd != NULL) |
232 | req->last_fd->next_fd = req->next_fd; |
233 | else |
234 | requests = req->next_fd; |
235 | if (req->next_fd != NULL) |
236 | req->next_fd->last_fd = req->last_fd; |
237 | } |
238 | else |
239 | { |
240 | if (req->last_fd != NULL) |
241 | req->last_fd->next_fd = req->next_prio; |
242 | else |
243 | requests = req->next_prio; |
244 | |
245 | if (req->next_fd != NULL) |
246 | req->next_fd->last_fd = req->next_prio; |
247 | |
248 | req->next_prio->last_fd = req->last_fd; |
249 | req->next_prio->next_fd = req->next_fd; |
250 | |
251 | /* Mark this entry as runnable. */ |
252 | req->next_prio->running = yes; |
253 | } |
254 | |
255 | if (req->running == yes) |
256 | { |
257 | struct requestlist *runp = runlist; |
258 | |
259 | last = NULL; |
260 | while (runp != NULL) |
261 | { |
262 | if (runp == req) |
263 | { |
264 | if (last == NULL) |
265 | runlist = runp->next_run; |
266 | else |
267 | last->next_run = runp->next_run; |
268 | break; |
269 | } |
270 | last = runp; |
271 | runp = runp->next_run; |
272 | } |
273 | } |
274 | } |
275 | } |
276 | |
277 | |
278 | /* The thread handler. */ |
279 | static void *handle_fildes_io (void *arg); |
280 | |
281 | |
282 | /* User optimization. */ |
283 | void |
284 | __aio_init (const struct aioinit *init) |
285 | { |
286 | /* Get the mutex. */ |
287 | __pthread_mutex_lock (&__aio_requests_mutex); |
288 | |
289 | /* Only allow writing new values if the table is not yet allocated. */ |
290 | if (pool == NULL) |
291 | { |
292 | optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads; |
293 | assert (powerof2 (ENTRIES_PER_ROW)); |
294 | optim.aio_num = (init->aio_num < ENTRIES_PER_ROW |
295 | ? ENTRIES_PER_ROW |
296 | : init->aio_num & ~(ENTRIES_PER_ROW - 1)); |
297 | } |
298 | |
299 | if (init->aio_idle_time != 0) |
300 | optim.aio_idle_time = init->aio_idle_time; |
301 | |
302 | /* Release the mutex. */ |
303 | __pthread_mutex_unlock (&__aio_requests_mutex); |
304 | } |
305 | |
306 | |
307 | /* The main function of the async I/O handling. It enqueues requests |
308 | and if necessary starts and handles threads. */ |
309 | struct requestlist * |
310 | __aio_enqueue_request (aiocb_union *aiocbp, int operation) |
311 | { |
312 | int result = 0; |
313 | int policy, prio; |
314 | struct sched_param param; |
315 | struct requestlist *last, *runp, *newp; |
316 | int running = no; |
317 | |
318 | if (operation == LIO_SYNC || operation == LIO_DSYNC) |
319 | aiocbp->aiocb.aio_reqprio = 0; |
320 | else if (aiocbp->aiocb.aio_reqprio < 0 |
321 | #ifdef AIO_PRIO_DELTA_MAX |
322 | || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX |
323 | #endif |
324 | ) |
325 | { |
326 | /* Invalid priority value. */ |
327 | __set_errno (EINVAL); |
328 | aiocbp->aiocb.__error_code = EINVAL; |
329 | aiocbp->aiocb.__return_value = -1; |
330 | return NULL; |
331 | } |
332 | |
333 | /* Compute priority for this request. */ |
334 | __pthread_getschedparam (__pthread_self (), &policy, ¶m); |
335 | prio = param.sched_priority - aiocbp->aiocb.aio_reqprio; |
336 | |
337 | /* Get the mutex. */ |
338 | __pthread_mutex_lock (&__aio_requests_mutex); |
339 | |
340 | last = NULL; |
341 | runp = requests; |
342 | /* First look whether the current file descriptor is currently |
343 | worked with. */ |
344 | while (runp != NULL |
345 | && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes) |
346 | { |
347 | last = runp; |
348 | runp = runp->next_fd; |
349 | } |
350 | |
351 | /* Get a new element for the waiting list. */ |
352 | newp = get_elem (); |
353 | if (newp == NULL) |
354 | { |
355 | __pthread_mutex_unlock (&__aio_requests_mutex); |
356 | __set_errno (EAGAIN); |
357 | return NULL; |
358 | } |
359 | newp->aiocbp = aiocbp; |
360 | newp->waiting = NULL; |
361 | |
362 | aiocbp->aiocb.__abs_prio = prio; |
363 | aiocbp->aiocb.__policy = policy; |
364 | aiocbp->aiocb.aio_lio_opcode = operation; |
365 | aiocbp->aiocb.__error_code = EINPROGRESS; |
366 | aiocbp->aiocb.__return_value = 0; |
367 | |
368 | if (runp != NULL |
369 | && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes) |
370 | { |
371 | /* The current file descriptor is worked on. It makes no sense |
372 | to start another thread since this new thread would fight |
373 | with the running thread for the resources. But we also cannot |
374 | say that the thread processing this descriptor shall immediately |
375 | after finishing the current job process this request if there |
376 | are other threads in the running queue which have a higher |
377 | priority. */ |
378 | |
379 | /* Simply enqueue it after the running one according to the |
380 | priority. */ |
381 | last = NULL; |
382 | while (runp->next_prio != NULL |
383 | && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio) |
384 | { |
385 | last = runp; |
386 | runp = runp->next_prio; |
387 | } |
388 | |
389 | newp->next_prio = runp->next_prio; |
390 | runp->next_prio = newp; |
391 | |
392 | running = queued; |
393 | } |
394 | else |
395 | { |
396 | running = yes; |
397 | /* Enqueue this request for a new descriptor. */ |
398 | if (last == NULL) |
399 | { |
400 | newp->last_fd = NULL; |
401 | newp->next_fd = requests; |
402 | if (requests != NULL) |
403 | requests->last_fd = newp; |
404 | requests = newp; |
405 | } |
406 | else |
407 | { |
408 | newp->next_fd = last->next_fd; |
409 | newp->last_fd = last; |
410 | last->next_fd = newp; |
411 | if (newp->next_fd != NULL) |
412 | newp->next_fd->last_fd = newp; |
413 | } |
414 | |
415 | newp->next_prio = NULL; |
416 | last = NULL; |
417 | } |
418 | |
419 | if (running == yes) |
420 | { |
421 | /* We try to create a new thread for this file descriptor. The |
422 | function which gets called will handle all available requests |
423 | for this descriptor and when all are processed it will |
424 | terminate. |
425 | |
426 | If no new thread can be created or if the specified limit of |
427 | threads for AIO is reached we queue the request. */ |
428 | |
429 | /* See if we need to and are able to create a thread. */ |
430 | if (nthreads < optim.aio_threads && idle_thread_count == 0) |
431 | { |
432 | pthread_t thid; |
433 | |
434 | running = newp->running = allocated; |
435 | |
436 | /* Now try to start a thread. */ |
437 | result = aio_create_helper_thread (&thid, handle_fildes_io, newp); |
438 | if (result == 0) |
439 | /* We managed to enqueue the request. All errors which can |
440 | happen now can be recognized by calls to `aio_return' and |
441 | `aio_error'. */ |
442 | ++nthreads; |
443 | else |
444 | { |
445 | /* Reset the running flag. The new request is not running. */ |
446 | running = newp->running = yes; |
447 | |
448 | if (nthreads == 0) |
449 | { |
450 | /* We cannot create a thread in the moment and there is |
451 | also no thread running. This is a problem. `errno' is |
452 | set to EAGAIN if this is only a temporary problem. */ |
453 | __aio_remove_request (last, newp, 0); |
454 | } |
455 | else |
456 | result = 0; |
457 | } |
458 | } |
459 | } |
460 | |
461 | /* Enqueue the request in the run queue if it is not yet running. */ |
462 | if (running == yes && result == 0) |
463 | { |
464 | add_request_to_runlist (newp); |
465 | |
466 | /* If there is a thread waiting for work, then let it know that we |
467 | have just given it something to do. */ |
468 | if (idle_thread_count > 0) |
469 | __pthread_cond_signal (&__aio_new_request_notification); |
470 | } |
471 | |
472 | if (result == 0) |
473 | newp->running = running; |
474 | else |
475 | { |
476 | /* Something went wrong. */ |
477 | __aio_free_request (newp); |
478 | aiocbp->aiocb.__error_code = result; |
479 | __set_errno (result); |
480 | newp = NULL; |
481 | } |
482 | |
483 | /* Release the mutex. */ |
484 | __pthread_mutex_unlock (&__aio_requests_mutex); |
485 | |
486 | return newp; |
487 | } |
488 | |
489 | |
490 | static void * |
491 | handle_fildes_io (void *arg) |
492 | { |
493 | pthread_t self = __pthread_self (); |
494 | struct sched_param param; |
495 | struct requestlist *runp = (struct requestlist *) arg; |
496 | aiocb_union *aiocbp; |
497 | int policy; |
498 | int fildes; |
499 | |
500 | __pthread_getschedparam (self, &policy, ¶m); |
501 | |
502 | do |
503 | { |
504 | /* If runp is NULL, then we were created to service the work queue |
505 | in general, not to handle any particular request. In that case we |
506 | skip the "do work" stuff on the first pass, and go directly to the |
507 | "get work off the work queue" part of this loop, which is near the |
508 | end. */ |
509 | if (runp == NULL) |
510 | __pthread_mutex_lock (&__aio_requests_mutex); |
511 | else |
512 | { |
513 | /* Hopefully this request is marked as running. */ |
514 | assert (runp->running == allocated); |
515 | |
516 | /* Update our variables. */ |
517 | aiocbp = runp->aiocbp; |
518 | fildes = aiocbp->aiocb.aio_fildes; |
519 | |
520 | /* Change the priority to the requested value (if necessary). */ |
521 | if (aiocbp->aiocb.__abs_prio != param.sched_priority |
522 | || aiocbp->aiocb.__policy != policy) |
523 | { |
524 | param.sched_priority = aiocbp->aiocb.__abs_prio; |
525 | policy = aiocbp->aiocb.__policy; |
526 | __pthread_setschedparam (self, policy, ¶m); |
527 | } |
528 | |
529 | /* Process request pointed to by RUNP. We must not be disturbed |
530 | by signals. */ |
531 | if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ) |
532 | { |
533 | if (sizeof (off_t) != sizeof (off64_t) |
534 | && aiocbp->aiocb.aio_lio_opcode & 128) |
535 | aiocbp->aiocb.__return_value = |
536 | TEMP_FAILURE_RETRY (__pread64 (fildes, (void *) |
537 | aiocbp->aiocb64.aio_buf, |
538 | aiocbp->aiocb64.aio_nbytes, |
539 | aiocbp->aiocb64.aio_offset)); |
540 | else |
541 | aiocbp->aiocb.__return_value = |
542 | TEMP_FAILURE_RETRY (__pread (fildes, |
543 | (void *) |
544 | aiocbp->aiocb.aio_buf, |
545 | aiocbp->aiocb.aio_nbytes, |
546 | aiocbp->aiocb.aio_offset)); |
547 | |
548 | if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE) |
549 | /* The Linux kernel is different from others. It returns |
550 | ESPIPE if using pread on a socket. Other platforms |
551 | simply ignore the offset parameter and behave like |
552 | read. */ |
553 | aiocbp->aiocb.__return_value = |
554 | TEMP_FAILURE_RETRY (read (fildes, |
555 | (void *) aiocbp->aiocb64.aio_buf, |
556 | aiocbp->aiocb64.aio_nbytes)); |
557 | } |
558 | else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE) |
559 | { |
560 | if (sizeof (off_t) != sizeof (off64_t) |
561 | && aiocbp->aiocb.aio_lio_opcode & 128) |
562 | aiocbp->aiocb.__return_value = |
563 | TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *) |
564 | aiocbp->aiocb64.aio_buf, |
565 | aiocbp->aiocb64.aio_nbytes, |
566 | aiocbp->aiocb64.aio_offset)); |
567 | else |
568 | aiocbp->aiocb.__return_value = |
569 | TEMP_FAILURE_RETRY (__pwrite (fildes, (const void *) |
570 | aiocbp->aiocb.aio_buf, |
571 | aiocbp->aiocb.aio_nbytes, |
572 | aiocbp->aiocb.aio_offset)); |
573 | |
574 | if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE) |
575 | /* The Linux kernel is different from others. It returns |
576 | ESPIPE if using pwrite on a socket. Other platforms |
577 | simply ignore the offset parameter and behave like |
578 | write. */ |
579 | aiocbp->aiocb.__return_value = |
580 | TEMP_FAILURE_RETRY (write (fildes, |
581 | (void *) aiocbp->aiocb64.aio_buf, |
582 | aiocbp->aiocb64.aio_nbytes)); |
583 | } |
584 | else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC) |
585 | aiocbp->aiocb.__return_value = |
586 | TEMP_FAILURE_RETRY (fdatasync (fildes)); |
587 | else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC) |
588 | aiocbp->aiocb.__return_value = |
589 | TEMP_FAILURE_RETRY (fsync (fildes)); |
590 | else |
591 | { |
592 | /* This is an invalid opcode. */ |
593 | aiocbp->aiocb.__return_value = -1; |
594 | __set_errno (EINVAL); |
595 | } |
596 | |
597 | /* Get the mutex. */ |
598 | __pthread_mutex_lock (&__aio_requests_mutex); |
599 | |
600 | if (aiocbp->aiocb.__return_value == -1) |
601 | aiocbp->aiocb.__error_code = errno; |
602 | else |
603 | aiocbp->aiocb.__error_code = 0; |
604 | |
605 | /* Send the signal to notify about finished processing of the |
606 | request. */ |
607 | __aio_notify (runp); |
608 | |
609 | /* For debugging purposes we reset the running flag of the |
610 | finished request. */ |
611 | assert (runp->running == allocated); |
612 | runp->running = done; |
613 | |
614 | /* Now dequeue the current request. */ |
615 | __aio_remove_request (NULL, runp, 0); |
616 | if (runp->next_prio != NULL) |
617 | add_request_to_runlist (runp->next_prio); |
618 | |
619 | /* Free the old element. */ |
620 | __aio_free_request (runp); |
621 | } |
622 | |
623 | runp = runlist; |
624 | |
625 | /* If the runlist is empty, then we sleep for a while, waiting for |
626 | something to arrive in it. */ |
627 | if (runp == NULL && optim.aio_idle_time >= 0) |
628 | { |
629 | struct timespec now; |
630 | struct timespec wakeup_time; |
631 | |
632 | ++idle_thread_count; |
633 | __clock_gettime (CLOCK_REALTIME, &now); |
634 | wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time; |
635 | wakeup_time.tv_nsec = now.tv_nsec; |
636 | if (wakeup_time.tv_nsec >= 1000000000) |
637 | { |
638 | wakeup_time.tv_nsec -= 1000000000; |
639 | ++wakeup_time.tv_sec; |
640 | } |
641 | __pthread_cond_timedwait (&__aio_new_request_notification, |
642 | &__aio_requests_mutex, |
643 | &wakeup_time); |
644 | --idle_thread_count; |
645 | runp = runlist; |
646 | } |
647 | |
648 | if (runp == NULL) |
649 | --nthreads; |
650 | else |
651 | { |
652 | assert (runp->running == yes); |
653 | runp->running = allocated; |
654 | runlist = runp->next_run; |
655 | |
656 | /* If we have a request to process, and there's still another in |
657 | the run list, then we need to either wake up or create a new |
658 | thread to service the request that is still in the run list. */ |
659 | if (runlist != NULL) |
660 | { |
661 | /* There are at least two items in the work queue to work on. |
662 | If there are other idle threads, then we should wake them |
663 | up for these other work elements; otherwise, we should try |
664 | to create a new thread. */ |
665 | if (idle_thread_count > 0) |
666 | __pthread_cond_signal (&__aio_new_request_notification); |
667 | else if (nthreads < optim.aio_threads) |
668 | { |
669 | pthread_t thid; |
670 | pthread_attr_t attr; |
671 | |
672 | /* Make sure the thread is created detached. */ |
673 | __pthread_attr_init (&attr); |
674 | __pthread_attr_setdetachstate (&attr, |
675 | PTHREAD_CREATE_DETACHED); |
676 | |
677 | /* Now try to start a thread. If we fail, no big deal, |
678 | because we know that there is at least one thread (us) |
679 | that is working on AIO operations. */ |
680 | if (__pthread_create (&thid, &attr, handle_fildes_io, NULL) |
681 | == 0) |
682 | ++nthreads; |
683 | } |
684 | } |
685 | } |
686 | |
687 | /* Release the mutex. */ |
688 | __pthread_mutex_unlock (&__aio_requests_mutex); |
689 | } |
690 | while (runp != NULL); |
691 | |
692 | return NULL; |
693 | } |
694 | |
695 | |
696 | /* Free allocated resources. */ |
697 | #if !PTHREAD_IN_LIBC |
698 | __attribute__ ((__destructor__)) static |
699 | #endif |
700 | void |
701 | __aio_freemem (void) |
702 | { |
703 | size_t row; |
704 | |
705 | for (row = 0; row < pool_size; ++row) |
706 | free (pool[row]); |
707 | |
708 | free (pool); |
709 | } |
710 | |
711 | |
712 | /* Add newrequest to the runlist. The __abs_prio flag of newrequest must |
713 | be correctly set to do this. Also, you had better set newrequest's |
714 | "running" flag to "yes" before you release your lock or you'll throw an |
715 | assertion. */ |
716 | static void |
717 | add_request_to_runlist (struct requestlist *newrequest) |
718 | { |
719 | int prio = newrequest->aiocbp->aiocb.__abs_prio; |
720 | struct requestlist *runp; |
721 | |
722 | if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio) |
723 | { |
724 | newrequest->next_run = runlist; |
725 | runlist = newrequest; |
726 | } |
727 | else |
728 | { |
729 | runp = runlist; |
730 | |
731 | while (runp->next_run != NULL |
732 | && runp->next_run->aiocbp->aiocb.__abs_prio >= prio) |
733 | runp = runp->next_run; |
734 | |
735 | newrequest->next_run = runp->next_run; |
736 | runp->next_run = newrequest; |
737 | } |
738 | } |
739 | |
740 | #if PTHREAD_IN_LIBC |
741 | versioned_symbol (libc, __aio_init, aio_init, GLIBC_2_34); |
742 | # if OTHER_SHLIB_COMPAT (librt, GLIBC_2_1, GLIBC_2_34) |
743 | compat_symbol (librt, __aio_init, aio_init, GLIBC_2_1); |
744 | # endif |
745 | #else /* !PTHREAD_IN_LIBC */ |
746 | weak_alias (__aio_init, aio_init) |
747 | #endif /* !PTHREAD_IN_LIBC */ |
748 | |