1/* Handle general operations.
2 Copyright (C) 1997-2021 Free Software Foundation, Inc.
3 This file is part of the GNU C Library.
4 Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
5
6 The GNU C Library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2.1 of the License, or (at your option) any later version.
10
11 The GNU C Library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public
17 License along with the GNU C Library; if not, see
18 <https://www.gnu.org/licenses/>. */
19
20#include <aio.h>
21#include <assert.h>
22#include <errno.h>
23#include <limits.h>
24#include <pthreadP.h>
25#include <stdlib.h>
26#include <unistd.h>
27#include <sys/param.h>
28#include <sys/stat.h>
29#include <sys/time.h>
30#include <aio_misc.h>
31
32#if !PTHREAD_IN_LIBC
33/* The available function names differ outside of libc. (In libc, we
34 need to use hidden aliases to avoid the PLT.) */
35# define __pread __libc_pread
36# define __pthread_attr_destroy pthread_attr_destroy
37# define __pthread_attr_init pthread_attr_init
38# define __pthread_attr_setdetachstate pthread_attr_setdetachstate
39# define __pthread_cond_signal pthread_cond_signal
40# define __pthread_cond_timedwait pthread_cond_timedwait
41# define __pthread_getschedparam pthread_getschedparam
42# define __pthread_setschedparam pthread_setschedparam
43# define __pwrite __libc_pwrite
44#endif
45
46#ifndef aio_create_helper_thread
47# define aio_create_helper_thread __aio_create_helper_thread
48
49extern inline int
50__aio_create_helper_thread (pthread_t *threadp, void *(*tf) (void *), void *arg)
51{
52 pthread_attr_t attr;
53
54 /* Make sure the thread is created detached. */
55 __pthread_attr_init (&attr);
56 __pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
57
58 int ret = __pthread_create (threadp, &attr, tf, arg);
59
60 __pthread_attr_destroy (&attr);
61 return ret;
62}
63#endif
64
65static void add_request_to_runlist (struct requestlist *newrequest);
66
67/* Pool of request list entries. */
68static struct requestlist **pool;
69
70/* Number of total and allocated pool entries. */
71static size_t pool_max_size;
72static size_t pool_size;
73
74/* We implement a two dimensional array but allocate each row separately.
75 The macro below determines how many entries should be used per row.
76 It should better be a power of two. */
77#define ENTRIES_PER_ROW 32
78
79/* How many rows we allocate at once. */
80#define ROWS_STEP 8
81
82/* List of available entries. */
83static struct requestlist *freelist;
84
85/* List of request waiting to be processed. */
86static struct requestlist *runlist;
87
88/* Structure list of all currently processed requests. */
89static struct requestlist *requests;
90
91/* Number of threads currently running. */
92static int nthreads;
93
94/* Number of threads waiting for work to arrive. */
95static int idle_thread_count;
96
97
98/* These are the values used to optimize the use of AIO. The user can
99 overwrite them by using the `aio_init' function. */
100static struct aioinit optim =
101{
102 20, /* int aio_threads; Maximal number of threads. */
103 64, /* int aio_num; Number of expected simultaneous requests. */
104 0,
105 0,
106 0,
107 0,
108 1,
109 0
110};
111
112
113/* Since the list is global we need a mutex protecting it. */
114pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
115
116/* When you add a request to the list and there are idle threads present,
117 you signal this condition variable. When a thread finishes work, it waits
118 on this condition variable for a time before it actually exits. */
119pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
120
121
122/* Functions to handle request list pool. */
123static struct requestlist *
124get_elem (void)
125{
126 struct requestlist *result;
127
128 if (freelist == NULL)
129 {
130 struct requestlist *new_row;
131 int cnt;
132
133 assert (sizeof (struct aiocb) == sizeof (struct aiocb64));
134
135 if (pool_size + 1 >= pool_max_size)
136 {
137 size_t new_max_size = pool_max_size + ROWS_STEP;
138 struct requestlist **new_tab;
139
140 new_tab = (struct requestlist **)
141 realloc (pool, new_max_size * sizeof (struct requestlist *));
142
143 if (new_tab == NULL)
144 return NULL;
145
146 pool_max_size = new_max_size;
147 pool = new_tab;
148 }
149
150 /* Allocate the new row. */
151 cnt = pool_size == 0 ? optim.aio_num : ENTRIES_PER_ROW;
152 new_row = (struct requestlist *) calloc (cnt,
153 sizeof (struct requestlist));
154 if (new_row == NULL)
155 return NULL;
156
157 pool[pool_size++] = new_row;
158
159 /* Put all the new entries in the freelist. */
160 do
161 {
162 new_row->next_prio = freelist;
163 freelist = new_row++;
164 }
165 while (--cnt > 0);
166 }
167
168 result = freelist;
169 freelist = freelist->next_prio;
170
171 return result;
172}
173
174
175void
176__aio_free_request (struct requestlist *elem)
177{
178 elem->running = no;
179 elem->next_prio = freelist;
180 freelist = elem;
181}
182
183
184struct requestlist *
185__aio_find_req (aiocb_union *elem)
186{
187 struct requestlist *runp = requests;
188 int fildes = elem->aiocb.aio_fildes;
189
190 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
191 runp = runp->next_fd;
192
193 if (runp != NULL)
194 {
195 if (runp->aiocbp->aiocb.aio_fildes != fildes)
196 runp = NULL;
197 else
198 while (runp != NULL && runp->aiocbp != elem)
199 runp = runp->next_prio;
200 }
201
202 return runp;
203}
204
205
206struct requestlist *
207__aio_find_req_fd (int fildes)
208{
209 struct requestlist *runp = requests;
210
211 while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < fildes)
212 runp = runp->next_fd;
213
214 return (runp != NULL && runp->aiocbp->aiocb.aio_fildes == fildes
215 ? runp : NULL);
216}
217
218
219void
220__aio_remove_request (struct requestlist *last, struct requestlist *req,
221 int all)
222{
223 assert (req->running == yes || req->running == queued
224 || req->running == done);
225
226 if (last != NULL)
227 last->next_prio = all ? NULL : req->next_prio;
228 else
229 {
230 if (all || req->next_prio == NULL)
231 {
232 if (req->last_fd != NULL)
233 req->last_fd->next_fd = req->next_fd;
234 else
235 requests = req->next_fd;
236 if (req->next_fd != NULL)
237 req->next_fd->last_fd = req->last_fd;
238 }
239 else
240 {
241 if (req->last_fd != NULL)
242 req->last_fd->next_fd = req->next_prio;
243 else
244 requests = req->next_prio;
245
246 if (req->next_fd != NULL)
247 req->next_fd->last_fd = req->next_prio;
248
249 req->next_prio->last_fd = req->last_fd;
250 req->next_prio->next_fd = req->next_fd;
251
252 /* Mark this entry as runnable. */
253 req->next_prio->running = yes;
254 }
255
256 if (req->running == yes)
257 {
258 struct requestlist *runp = runlist;
259
260 last = NULL;
261 while (runp != NULL)
262 {
263 if (runp == req)
264 {
265 if (last == NULL)
266 runlist = runp->next_run;
267 else
268 last->next_run = runp->next_run;
269 break;
270 }
271 last = runp;
272 runp = runp->next_run;
273 }
274 }
275 }
276}
277
278
279/* The thread handler. */
280static void *handle_fildes_io (void *arg);
281
282
283/* User optimization. */
284void
285__aio_init (const struct aioinit *init)
286{
287 /* Get the mutex. */
288 __pthread_mutex_lock (&__aio_requests_mutex);
289
290 /* Only allow writing new values if the table is not yet allocated. */
291 if (pool == NULL)
292 {
293 optim.aio_threads = init->aio_threads < 1 ? 1 : init->aio_threads;
294 assert (powerof2 (ENTRIES_PER_ROW));
295 optim.aio_num = (init->aio_num < ENTRIES_PER_ROW
296 ? ENTRIES_PER_ROW
297 : init->aio_num & ~(ENTRIES_PER_ROW - 1));
298 }
299
300 if (init->aio_idle_time != 0)
301 optim.aio_idle_time = init->aio_idle_time;
302
303 /* Release the mutex. */
304 __pthread_mutex_unlock (&__aio_requests_mutex);
305}
306
307
308/* The main function of the async I/O handling. It enqueues requests
309 and if necessary starts and handles threads. */
310struct requestlist *
311__aio_enqueue_request (aiocb_union *aiocbp, int operation)
312{
313 int result = 0;
314 int policy, prio;
315 struct sched_param param;
316 struct requestlist *last, *runp, *newp;
317 int running = no;
318
319 if (operation == LIO_SYNC || operation == LIO_DSYNC)
320 aiocbp->aiocb.aio_reqprio = 0;
321 else if (aiocbp->aiocb.aio_reqprio < 0
322#ifdef AIO_PRIO_DELTA_MAX
323 || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX
324#endif
325 )
326 {
327 /* Invalid priority value. */
328 __set_errno (EINVAL);
329 aiocbp->aiocb.__error_code = EINVAL;
330 aiocbp->aiocb.__return_value = -1;
331 return NULL;
332 }
333
334 /* Compute priority for this request. */
335 __pthread_getschedparam (__pthread_self (), &policy, &param);
336 prio = param.sched_priority - aiocbp->aiocb.aio_reqprio;
337
338 /* Get the mutex. */
339 __pthread_mutex_lock (&__aio_requests_mutex);
340
341 last = NULL;
342 runp = requests;
343 /* First look whether the current file descriptor is currently
344 worked with. */
345 while (runp != NULL
346 && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes)
347 {
348 last = runp;
349 runp = runp->next_fd;
350 }
351
352 /* Get a new element for the waiting list. */
353 newp = get_elem ();
354 if (newp == NULL)
355 {
356 __pthread_mutex_unlock (&__aio_requests_mutex);
357 __set_errno (EAGAIN);
358 return NULL;
359 }
360 newp->aiocbp = aiocbp;
361 newp->waiting = NULL;
362
363 aiocbp->aiocb.__abs_prio = prio;
364 aiocbp->aiocb.__policy = policy;
365 aiocbp->aiocb.aio_lio_opcode = operation;
366 aiocbp->aiocb.__error_code = EINPROGRESS;
367 aiocbp->aiocb.__return_value = 0;
368
369 if (runp != NULL
370 && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes)
371 {
372 /* The current file descriptor is worked on. It makes no sense
373 to start another thread since this new thread would fight
374 with the running thread for the resources. But we also cannot
375 say that the thread processing this desriptor shall immediately
376 after finishing the current job process this request if there
377 are other threads in the running queue which have a higher
378 priority. */
379
380 /* Simply enqueue it after the running one according to the
381 priority. */
382 last = NULL;
383 while (runp->next_prio != NULL
384 && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio)
385 {
386 last = runp;
387 runp = runp->next_prio;
388 }
389
390 newp->next_prio = runp->next_prio;
391 runp->next_prio = newp;
392
393 running = queued;
394 }
395 else
396 {
397 running = yes;
398 /* Enqueue this request for a new descriptor. */
399 if (last == NULL)
400 {
401 newp->last_fd = NULL;
402 newp->next_fd = requests;
403 if (requests != NULL)
404 requests->last_fd = newp;
405 requests = newp;
406 }
407 else
408 {
409 newp->next_fd = last->next_fd;
410 newp->last_fd = last;
411 last->next_fd = newp;
412 if (newp->next_fd != NULL)
413 newp->next_fd->last_fd = newp;
414 }
415
416 newp->next_prio = NULL;
417 last = NULL;
418 }
419
420 if (running == yes)
421 {
422 /* We try to create a new thread for this file descriptor. The
423 function which gets called will handle all available requests
424 for this descriptor and when all are processed it will
425 terminate.
426
427 If no new thread can be created or if the specified limit of
428 threads for AIO is reached we queue the request. */
429
430 /* See if we need to and are able to create a thread. */
431 if (nthreads < optim.aio_threads && idle_thread_count == 0)
432 {
433 pthread_t thid;
434
435 running = newp->running = allocated;
436
437 /* Now try to start a thread. */
438 result = aio_create_helper_thread (&thid, handle_fildes_io, newp);
439 if (result == 0)
440 /* We managed to enqueue the request. All errors which can
441 happen now can be recognized by calls to `aio_return' and
442 `aio_error'. */
443 ++nthreads;
444 else
445 {
446 /* Reset the running flag. The new request is not running. */
447 running = newp->running = yes;
448
449 if (nthreads == 0)
450 {
451 /* We cannot create a thread in the moment and there is
452 also no thread running. This is a problem. `errno' is
453 set to EAGAIN if this is only a temporary problem. */
454 __aio_remove_request (last, newp, 0);
455 }
456 else
457 result = 0;
458 }
459 }
460 }
461
462 /* Enqueue the request in the run queue if it is not yet running. */
463 if (running == yes && result == 0)
464 {
465 add_request_to_runlist (newp);
466
467 /* If there is a thread waiting for work, then let it know that we
468 have just given it something to do. */
469 if (idle_thread_count > 0)
470 __pthread_cond_signal (&__aio_new_request_notification);
471 }
472
473 if (result == 0)
474 newp->running = running;
475 else
476 {
477 /* Something went wrong. */
478 __aio_free_request (newp);
479 aiocbp->aiocb.__error_code = result;
480 __set_errno (result);
481 newp = NULL;
482 }
483
484 /* Release the mutex. */
485 __pthread_mutex_unlock (&__aio_requests_mutex);
486
487 return newp;
488}
489
490
491static void *
492handle_fildes_io (void *arg)
493{
494 pthread_t self = __pthread_self ();
495 struct sched_param param;
496 struct requestlist *runp = (struct requestlist *) arg;
497 aiocb_union *aiocbp;
498 int policy;
499 int fildes;
500
501 __pthread_getschedparam (self, &policy, &param);
502
503 do
504 {
505 /* If runp is NULL, then we were created to service the work queue
506 in general, not to handle any particular request. In that case we
507 skip the "do work" stuff on the first pass, and go directly to the
508 "get work off the work queue" part of this loop, which is near the
509 end. */
510 if (runp == NULL)
511 __pthread_mutex_lock (&__aio_requests_mutex);
512 else
513 {
514 /* Hopefully this request is marked as running. */
515 assert (runp->running == allocated);
516
517 /* Update our variables. */
518 aiocbp = runp->aiocbp;
519 fildes = aiocbp->aiocb.aio_fildes;
520
521 /* Change the priority to the requested value (if necessary). */
522 if (aiocbp->aiocb.__abs_prio != param.sched_priority
523 || aiocbp->aiocb.__policy != policy)
524 {
525 param.sched_priority = aiocbp->aiocb.__abs_prio;
526 policy = aiocbp->aiocb.__policy;
527 __pthread_setschedparam (self, policy, &param);
528 }
529
530 /* Process request pointed to by RUNP. We must not be disturbed
531 by signals. */
532 if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
533 {
534 if (sizeof (off_t) != sizeof (off64_t)
535 && aiocbp->aiocb.aio_lio_opcode & 128)
536 aiocbp->aiocb.__return_value =
537 TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
538 aiocbp->aiocb64.aio_buf,
539 aiocbp->aiocb64.aio_nbytes,
540 aiocbp->aiocb64.aio_offset));
541 else
542 aiocbp->aiocb.__return_value =
543 TEMP_FAILURE_RETRY (__pread (fildes,
544 (void *)
545 aiocbp->aiocb.aio_buf,
546 aiocbp->aiocb.aio_nbytes,
547 aiocbp->aiocb.aio_offset));
548
549 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
550 /* The Linux kernel is different from others. It returns
551 ESPIPE if using pread on a socket. Other platforms
552 simply ignore the offset parameter and behave like
553 read. */
554 aiocbp->aiocb.__return_value =
555 TEMP_FAILURE_RETRY (read (fildes,
556 (void *) aiocbp->aiocb64.aio_buf,
557 aiocbp->aiocb64.aio_nbytes));
558 }
559 else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
560 {
561 if (sizeof (off_t) != sizeof (off64_t)
562 && aiocbp->aiocb.aio_lio_opcode & 128)
563 aiocbp->aiocb.__return_value =
564 TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
565 aiocbp->aiocb64.aio_buf,
566 aiocbp->aiocb64.aio_nbytes,
567 aiocbp->aiocb64.aio_offset));
568 else
569 aiocbp->aiocb.__return_value =
570 TEMP_FAILURE_RETRY (__pwrite (fildes, (const void *)
571 aiocbp->aiocb.aio_buf,
572 aiocbp->aiocb.aio_nbytes,
573 aiocbp->aiocb.aio_offset));
574
575 if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
576 /* The Linux kernel is different from others. It returns
577 ESPIPE if using pwrite on a socket. Other platforms
578 simply ignore the offset parameter and behave like
579 write. */
580 aiocbp->aiocb.__return_value =
581 TEMP_FAILURE_RETRY (write (fildes,
582 (void *) aiocbp->aiocb64.aio_buf,
583 aiocbp->aiocb64.aio_nbytes));
584 }
585 else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
586 aiocbp->aiocb.__return_value =
587 TEMP_FAILURE_RETRY (fdatasync (fildes));
588 else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
589 aiocbp->aiocb.__return_value =
590 TEMP_FAILURE_RETRY (fsync (fildes));
591 else
592 {
593 /* This is an invalid opcode. */
594 aiocbp->aiocb.__return_value = -1;
595 __set_errno (EINVAL);
596 }
597
598 /* Get the mutex. */
599 __pthread_mutex_lock (&__aio_requests_mutex);
600
601 if (aiocbp->aiocb.__return_value == -1)
602 aiocbp->aiocb.__error_code = errno;
603 else
604 aiocbp->aiocb.__error_code = 0;
605
606 /* Send the signal to notify about finished processing of the
607 request. */
608 __aio_notify (runp);
609
610 /* For debugging purposes we reset the running flag of the
611 finished request. */
612 assert (runp->running == allocated);
613 runp->running = done;
614
615 /* Now dequeue the current request. */
616 __aio_remove_request (NULL, runp, 0);
617 if (runp->next_prio != NULL)
618 add_request_to_runlist (runp->next_prio);
619
620 /* Free the old element. */
621 __aio_free_request (runp);
622 }
623
624 runp = runlist;
625
626 /* If the runlist is empty, then we sleep for a while, waiting for
627 something to arrive in it. */
628 if (runp == NULL && optim.aio_idle_time >= 0)
629 {
630 struct timespec now;
631 struct timespec wakeup_time;
632
633 ++idle_thread_count;
634 __clock_gettime (CLOCK_REALTIME, &now);
635 wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
636 wakeup_time.tv_nsec = now.tv_nsec;
637 if (wakeup_time.tv_nsec >= 1000000000)
638 {
639 wakeup_time.tv_nsec -= 1000000000;
640 ++wakeup_time.tv_sec;
641 }
642 __pthread_cond_timedwait (&__aio_new_request_notification,
643 &__aio_requests_mutex,
644 &wakeup_time);
645 --idle_thread_count;
646 runp = runlist;
647 }
648
649 if (runp == NULL)
650 --nthreads;
651 else
652 {
653 assert (runp->running == yes);
654 runp->running = allocated;
655 runlist = runp->next_run;
656
657 /* If we have a request to process, and there's still another in
658 the run list, then we need to either wake up or create a new
659 thread to service the request that is still in the run list. */
660 if (runlist != NULL)
661 {
662 /* There are at least two items in the work queue to work on.
663 If there are other idle threads, then we should wake them
664 up for these other work elements; otherwise, we should try
665 to create a new thread. */
666 if (idle_thread_count > 0)
667 __pthread_cond_signal (&__aio_new_request_notification);
668 else if (nthreads < optim.aio_threads)
669 {
670 pthread_t thid;
671 pthread_attr_t attr;
672
673 /* Make sure the thread is created detached. */
674 __pthread_attr_init (&attr);
675 __pthread_attr_setdetachstate (&attr,
676 PTHREAD_CREATE_DETACHED);
677
678 /* Now try to start a thread. If we fail, no big deal,
679 because we know that there is at least one thread (us)
680 that is working on AIO operations. */
681 if (__pthread_create (&thid, &attr, handle_fildes_io, NULL)
682 == 0)
683 ++nthreads;
684 }
685 }
686 }
687
688 /* Release the mutex. */
689 __pthread_mutex_unlock (&__aio_requests_mutex);
690 }
691 while (runp != NULL);
692
693 return NULL;
694}
695
696
697/* Free allocated resources. */
698libc_freeres_fn (free_res)
699{
700 size_t row;
701
702 for (row = 0; row < pool_max_size; ++row)
703 free (pool[row]);
704
705 free (pool);
706}
707
708
709/* Add newrequest to the runlist. The __abs_prio flag of newrequest must
710 be correctly set to do this. Also, you had better set newrequest's
711 "running" flag to "yes" before you release your lock or you'll throw an
712 assertion. */
713static void
714add_request_to_runlist (struct requestlist *newrequest)
715{
716 int prio = newrequest->aiocbp->aiocb.__abs_prio;
717 struct requestlist *runp;
718
719 if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
720 {
721 newrequest->next_run = runlist;
722 runlist = newrequest;
723 }
724 else
725 {
726 runp = runlist;
727
728 while (runp->next_run != NULL
729 && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
730 runp = runp->next_run;
731
732 newrequest->next_run = runp->next_run;
733 runp->next_run = newrequest;
734 }
735}
736
737#if PTHREAD_IN_LIBC
738versioned_symbol (libc, __aio_init, aio_init, GLIBC_2_34);
739# if OTHER_SHLIB_COMPAT (librt, GLIBC_2_1, GLIBC_2_34)
740compat_symbol (librt, __aio_init, aio_init, GLIBC_2_1);
741# endif
742#else /* !PTHREAD_IN_LIBC */
743weak_alias (__aio_init, aio_init)
744#endif /* !PTHREAD_IN_LIBC */
745