19 #include <evfibers/config.h>
23 #include <linux/limits.h>
31 #ifdef HAVE_VALGRIND_H
32 #include <valgrind/valgrind.h>
34 #define RUNNING_ON_VALGRIND (0)
35 #define VALGRIND_STACK_REGISTER(a,b) (void)0
38 #ifdef FBR_EIO_ENABLED
41 #include <evfibers_private/fiber.h>
43 #ifndef LIST_FOREACH_SAFE
44 #define LIST_FOREACH_SAFE(var, head, field, next_var) \
45 for ((var) = ((head)->lh_first); \
46 (var) && ((next_var) = ((var)->field.le_next), 1); \
51 #ifndef TAILQ_FOREACH_SAFE
52 #define TAILQ_FOREACH_SAFE(var, head, field, next_var) \
53 for ((var) = ((head)->tqh_first); \
54 (var) ? ({ (next_var) = ((var)->field.tqe_next); 1; }) \
60 #define ENSURE_ROOT_FIBER do { \
61 assert(fctx->__p->sp->fiber == &fctx->__p->root); \
64 #define CURRENT_FIBER (fctx->__p->sp->fiber)
65 #define CURRENT_FIBER_ID (fbr_id_pack(CURRENT_FIBER))
66 #define CALLED_BY_ROOT ((fctx->__p->sp - 1)->fiber == &fctx->__p->root)
68 #define unpack_transfer_errno(value, ptr, id) \
70 if (-1 == fbr_id_unpack(fctx, ptr, id)) \
74 #define return_success(value) \
76 fctx->f_errno = FBR_SUCCESS; \
80 #define return_error(value, code) \
82 fctx->f_errno = (code); \
87 const fbr_id_t FBR_ID_NULL = {0, NULL};
88 static const char default_buffer_pattern[] =
"/dev/shm/fbr_buffer.XXXXXXXXX";
90 static fbr_id_t fbr_id_pack(
struct fbr_fiber *fiber)
92 return (
struct fbr_id_s){.g = fiber->id, .p = fiber};
95 static int fbr_id_unpack(
FBR_P_ struct fbr_fiber **ptr,
fbr_id_t id)
97 struct fbr_fiber *fiber =
id.p;
98 if (fiber->id !=
id.g)
99 return_error(-1, FBR_ENOFIBER);
105 static void pending_async_cb(EV_P_ ev_async *w, _unused_
int revents)
114 if (TAILQ_EMPTY(&fctx->
__p->pending_fibers)) {
115 ev_async_stop(EV_A_ &fctx->
__p->pending_async);
119 item = TAILQ_FIRST(&fctx->
__p->pending_fibers);
120 assert(item->head == &fctx->
__p->pending_fibers);
124 ev_async_send(EV_A_ &fctx->
__p->pending_async);
126 retval = fbr_transfer(
FBR_A_ item->id);
127 if (-1 == retval && FBR_ENOFIBER != fctx->
f_errno) {
129 " a fiber by id: %s",
134 static void *allocate_in_fiber(
FBR_P_ size_t size,
struct fbr_fiber *in)
136 struct mem_pool *pool_entry;
137 pool_entry = malloc(size +
sizeof(
struct mem_pool));
138 if (NULL == pool_entry) {
140 size +
sizeof(
struct mem_pool));
143 pool_entry->ptr = pool_entry;
144 pool_entry->destructor = NULL;
145 pool_entry->destructor_context = NULL;
146 LIST_INSERT_HEAD(&in->pool, pool_entry, entries);
147 return pool_entry + 1;
153 struct fbr_fiber *fiber;
158 if (level > logger->level)
161 fiber = CURRENT_FIBER;
168 case FBR_LOG_WARNING:
169 str_level =
"WARNING";
173 str_level =
"NOTICE";
189 tstamp = ev_now(fctx->
__p->loop);
190 fprintf(stream,
"%.6f %-7s %-16s ", tstamp, str_level, fiber->name);
191 vfprintf(stream, format, ap);
192 fprintf(stream,
"\n");
197 struct fbr_fiber *root;
199 char *buffer_pattern;
201 fctx->
__p = malloc(
sizeof(
struct fbr_context_private));
202 LIST_INIT(&fctx->
__p->reclaimed);
203 LIST_INIT(&fctx->
__p->root.children);
204 LIST_INIT(&fctx->
__p->root.pool);
205 TAILQ_INIT(&fctx->
__p->root.destructors);
206 TAILQ_INIT(&fctx->
__p->pending_fibers);
208 root = &fctx->
__p->root;
210 fctx->
__p->last_id = 0;
211 root->id = fctx->
__p->last_id++;
212 coro_create(&root->ctx, NULL, NULL, NULL, 0);
215 logger->
logv = stdio_logger;
216 logger->
level = FBR_LOG_NOTICE;
219 fctx->
__p->sp = fctx->
__p->stack;
220 fctx->
__p->sp->fiber = root;
221 fctx->
__p->backtraces_enabled = 1;
222 fill_trace_info(
FBR_A_ &fctx->
__p->sp->tinfo);
223 fctx->
__p->loop = loop;
224 fctx->
__p->pending_async.data = fctx;
225 fctx->
__p->backtraces_enabled = 0;
226 memset(&fctx->
__p->key_free_mask, 0x00,
227 sizeof(fctx->
__p->key_free_mask));
228 ev_async_init(&fctx->
__p->pending_async, pending_async_cb);
230 buffer_pattern = getenv(
"FBR_BUFFER_FILE_PATTERN");
232 fctx->
__p->buffer_file_pattern = buffer_pattern;
234 fctx->
__p->buffer_file_pattern = default_buffer_pattern;
243 return "Invalid argument";
245 return "No such fiber";
247 return "System error, consult system errno";
248 case FBR_EBUFFERMMAP:
249 return "Failed to mmap two adjacent regions";
251 return "Fiber-local key does not exist";
253 return "Protobuf unpacking error";
254 case FBR_EBUFFERNOSPACE:
255 return "Not enough space in the buffer";
257 return "libeio request error";
259 return "Unknown error";
265 va_start(ap, format);
273 va_start(ap, format);
281 va_start(ap, format);
289 va_start(ap, format);
297 va_start(ap, format);
302 void id_tailq_i_set(_unused_
FBR_P_
304 struct fbr_fiber *fiber)
306 item->id = fbr_id_pack(fiber);
310 static void reclaim_children(
FBR_P_ struct fbr_fiber *fiber)
313 LIST_FOREACH(f, &fiber->children, entries.children) {
314 fbr_reclaim(
FBR_A_ fbr_id_pack(f));
318 static void fbr_free_in_fiber(_unused_
FBR_P_ _unused_
struct fbr_fiber *fiber,
319 void *ptr,
int destructor);
323 struct fbr_fiber *fiber, *x;
324 struct mem_pool *p, *x2;
326 reclaim_children(
FBR_A_ &fctx->
__p->root);
328 LIST_FOREACH_SAFE(p, &fctx->
__p->root.pool, entries, x2) {
329 fbr_free_in_fiber(
FBR_A_ &fctx->
__p->root, p + 1, 1);
332 LIST_FOREACH_SAFE(fiber, &fctx->
__p->reclaimed, entries.reclaimed, x) {
333 if (0 != munmap(fiber->stack, fiber->stack_size))
334 err(EXIT_FAILURE,
"munmap");
344 fctx->
__p->backtraces_enabled = 1;
346 fctx->
__p->backtraces_enabled = 0;
352 fbr_destructor_remove(
FBR_A_ &ev->item.dtor, 1 );
355 static void post_ev(_unused_
FBR_P_ struct fbr_fiber *fiber,
358 assert(NULL != fiber->ev.waiting);
360 fiber->ev.arrived = 1;
364 static void ev_watcher_cb(_unused_ EV_P_ ev_watcher *w, _unused_
int event)
366 struct fbr_fiber *fiber;
373 retval = fbr_id_unpack(
FBR_A_ &fiber, ev->ev_base.
id);
376 " the watcher callback, but it's id is not valid: %s",
381 post_ev(
FBR_A_ fiber, &ev->ev_base);
383 retval = fbr_transfer(
FBR_A_ fbr_id_pack(fiber));
388 static void fbr_free_in_fiber(_unused_
FBR_P_ _unused_
struct fbr_fiber *fiber,
389 void *ptr,
int destructor)
391 struct mem_pool *pool_entry = NULL;
394 pool_entry = (
struct mem_pool *)ptr - 1;
395 if (pool_entry->ptr != pool_entry) {
397 "fiber memory pool entry", ptr);
398 if (!RUNNING_ON_VALGRIND)
401 LIST_REMOVE(pool_entry, entries);
402 if (destructor && pool_entry->destructor)
403 pool_entry->destructor(
FBR_A_ ptr, pool_entry->destructor_context);
407 static void fiber_cleanup(
FBR_P_ struct fbr_fiber *fiber)
409 struct mem_pool *p, *x;
412 LIST_REMOVE(fiber, entries.children);
413 TAILQ_FOREACH(dtor, &fiber->destructors, entries) {
416 LIST_FOREACH_SAFE(p, &fiber->pool, entries, x) {
417 fbr_free_in_fiber(
FBR_A_ fiber, p + 1, 1);
421 static void filter_fiber_stack(
FBR_P_ struct fbr_fiber *fiber)
423 struct fbr_stack_item *sp;
424 for (sp = fctx->
__p->stack; sp < fctx->__p->sp; sp++) {
425 if (sp->fiber == fiber) {
426 memmove(sp, sp + 1, (fctx->
__p->sp - sp) *
sizeof(*sp));
434 struct fbr_fiber *fiber;
441 unpack_transfer_errno(-1, &fiber,
id);
444 fbr_mutex_lock(
FBR_A_ &mutex);
445 while (fiber->no_reclaim) {
446 fiber->want_reclaim = 1;
447 assert(
"Attempt to reclaim self while no_reclaim is set would"
448 " block forever" && fiber != CURRENT_FIBER);
449 if (-1 == fbr_id_unpack(
FBR_A_ NULL,
id) &&
452 retval = fbr_cond_wait(
FBR_A_ &fiber->reclaim_cond, &mutex);
456 fbr_mutex_unlock(
FBR_A_ &mutex);
459 if (-1 == fbr_id_unpack(
FBR_A_ NULL,
id) &&
463 fill_trace_info(
FBR_A_ &fiber->reclaim_tinfo);
464 reclaim_children(
FBR_A_ fiber);
465 fiber_cleanup(
FBR_A_ fiber);
466 fiber->id = fctx->
__p->last_id++;
468 LIST_FOREACH(f, &fctx->
__p->reclaimed, entries.reclaimed) {
472 LIST_INSERT_HEAD(&fctx->
__p->reclaimed, fiber, entries.reclaimed);
474 filter_fiber_stack(
FBR_A_ fiber);
476 if (CURRENT_FIBER == fiber)
484 struct fbr_fiber *fiber;
486 unpack_transfer_errno(-1, &fiber,
id);
487 fiber->no_reclaim = 0;
488 fbr_cond_broadcast(
FBR_A_ &fiber->reclaim_cond);
494 struct fbr_fiber *fiber;
496 unpack_transfer_errno(-1, &fiber,
id);
497 fiber->no_reclaim = 1;
503 struct fbr_fiber *fiber;
505 unpack_transfer_errno(-1, &fiber,
id);
506 return_success(fiber->want_reclaim);
511 if (0 == fbr_id_unpack(
FBR_A_ NULL,
id))
518 return CURRENT_FIBER_ID;
521 static void call_wrapper(
FBR_P)
524 struct fbr_fiber *fiber = CURRENT_FIBER;
526 fiber->func(
FBR_A_ fiber->func_arg);
528 retval = fbr_reclaim(
FBR_A_ fbr_id_pack(fiber));
535 enum ev_action_hint {
541 static void item_dtor(_unused_
FBR_P_ void *arg)
546 TAILQ_REMOVE(item->head, item, entries);
558 ev->item.dtor.
func = item_dtor;
559 ev->item.dtor.arg = item;
560 fbr_destructor_add(
FBR_A_ &ev->item.dtor);
565 if (!ev_is_active(e_watcher->
w)) {
566 fbr_destructor_remove(
FBR_A_ &ev->item.dtor,
570 e_watcher->
w->data = e_watcher;
571 ev_set_cb(e_watcher->
w, ev_watcher_cb);
575 if (fbr_id_isnull(e_mutex->
mutex->locked_by)) {
576 e_mutex->
mutex->locked_by = CURRENT_FIBER_ID;
577 return EV_AH_ARRIVED;
579 id_tailq_i_set(
FBR_A_ item, CURRENT_FIBER);
582 TAILQ_INSERT_TAIL(&e_mutex->
mutex->pending, item, entries);
583 item->head = &e_mutex->
mutex->pending;
587 if (fbr_id_isnull(e_cond->
mutex->locked_by)) {
588 fbr_destructor_remove(
FBR_A_ &ev->item.dtor,
592 id_tailq_i_set(
FBR_A_ item, CURRENT_FIBER);
595 TAILQ_INSERT_TAIL(&e_cond->
cond->waiting, item, entries);
596 item->head = &e_cond->
cond->waiting;
600 #ifdef FBR_EIO_ENABLED
615 fbr_destructor_remove(
FBR_A_ &ev->item.dtor, 1 );
623 ev_set_cb(e_watcher->
w, NULL);
629 #ifdef FBR_EIO_ENABLED
639 static void watcher_timer_dtor(_unused_
FBR_P_ void *_arg)
641 struct ev_timer *w = _arg;
642 ev_timer_stop(fctx->
__p->loop, w);
655 ev_timer_init(&timer, NULL, timeout, 0.);
656 ev_timer_start(fctx->
__p->loop, &timer);
657 fbr_ev_watcher_init(
FBR_A_ &watcher,
658 (
struct ev_watcher *)&timer);
659 dtor.
func = watcher_timer_dtor;
661 fbr_destructor_add(
FBR_A_ &dtor);
663 for (ev_pptr = events; NULL != *ev_pptr; ev_pptr++)
665 new_events = alloca((size + 1) *
sizeof(
void *));
666 memcpy(new_events, events, size *
sizeof(
void *));
667 new_events[size] = &watcher.ev_base;
668 new_events[size + 1] = NULL;
669 n_events = fbr_ev_wait(
FBR_A_ new_events);
670 fbr_destructor_remove(
FBR_A_ &dtor, 1 );
673 if (watcher.ev_base.arrived)
680 struct fbr_fiber *fiber = CURRENT_FIBER;
681 enum ev_action_hint hint;
685 fiber->ev.arrived = 0;
686 fiber->ev.waiting = events;
688 for (i = 0; NULL != events[i]; i++) {
689 hint = prepare_ev(
FBR_A_ events[i]);
694 fiber->ev.arrived = 1;
695 events[i]->arrived = 1;
698 return_error(-1, FBR_EINVAL);
702 while (0 == fiber->ev.arrived)
705 for (i = 0; NULL != events[i]; i++) {
706 if (events[i]->arrived) {
708 finish_ev(
FBR_A_ events[i]);
710 cancel_ev(
FBR_A_ events[i]);
717 struct fbr_fiber *fiber = CURRENT_FIBER;
718 enum ev_action_hint hint;
722 fiber->ev.waiting = events;
724 hint = prepare_ev(
FBR_A_ one);
731 return_error(-1, FBR_EINVAL);
734 while (0 == fiber->ev.arrived)
744 struct fbr_fiber *callee;
745 struct fbr_fiber *caller = fctx->
__p->sp->fiber;
747 unpack_transfer_errno(-1, &callee, to);
751 fctx->
__p->sp->fiber = callee;
752 fill_trace_info(
FBR_A_ &fctx->
__p->sp->tinfo);
754 coro_transfer(&caller->ctx, &callee->ctx);
761 struct fbr_fiber *callee;
762 struct fbr_fiber *caller;
763 assert(
"Attemp to yield in a root fiber" &&
764 fctx->
__p->sp->fiber != &fctx->
__p->root);
765 callee = fctx->
__p->sp->fiber;
766 caller = (--fctx->
__p->sp)->fiber;
767 coro_transfer(&callee->ctx, &caller->ctx);
774 flags = fcntl(fd, F_GETFL, 0);
776 return_error(-1, FBR_ESYSTEM);
779 s = fcntl(fd, F_SETFL, flags);
781 return_error(-1, FBR_ESYSTEM);
789 memset(ev, 0x00,
sizeof(*ev));
791 ev->id = CURRENT_FIBER_ID;
801 static void watcher_io_dtor(_unused_
FBR_P_ void *_arg)
803 struct ev_io *w = _arg;
804 ev_io_stop(fctx->
__p->loop, w);
807 int fbr_connect(
FBR_P_ int sockfd,
const struct sockaddr *addr,
814 r = connect(sockfd, addr, addrlen);
815 if ((-1 == r) && (EINPROGRESS != errno))
818 ev_io_init(&io, NULL, sockfd, EV_WRITE);
819 ev_io_start(fctx->
__p->loop, &io);
820 dtor.
func = watcher_io_dtor;
822 fbr_destructor_add(
FBR_A_ &dtor);
823 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
824 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
827 if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (
void *)&r, &len)) {
829 }
else if ( 0 != r ) {
834 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
835 ev_io_stop(fctx->
__p->loop, &io);
839 ssize_t fbr_read(
FBR_P_ int fd,
void *buf,
size_t count)
846 ev_io_init(&io, NULL, fd, EV_READ);
847 ev_io_start(fctx->
__p->loop, &io);
848 dtor.
func = watcher_io_dtor;
850 fbr_destructor_add(
FBR_A_ &dtor);
852 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
853 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
855 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
857 r = read(fd, buf, count);
858 }
while (-1 == r && EINTR == errno);
860 ev_io_stop(fctx->
__p->loop, &io);
865 ssize_t fbr_read_all(
FBR_P_ int fd,
void *buf,
size_t count)
873 ev_io_init(&io, NULL, fd, EV_READ);
874 ev_io_start(fctx->
__p->loop, &io);
875 dtor.
func = watcher_io_dtor;
877 fbr_destructor_add(
FBR_A_ &dtor);
879 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
881 while (count != done) {
883 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
885 r = read(fd, buf + done, count - done);
902 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
903 ev_io_stop(fctx->
__p->loop, &io);
904 return (ssize_t)done;
907 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
908 ev_io_stop(fctx->
__p->loop, &io);
912 ssize_t fbr_readline(
FBR_P_ int fd,
void *buffer,
size_t n)
919 if (n <= 0 || buffer == NULL) {
928 num_read = fbr_read(
FBR_A_ fd, &ch, 1);
930 if (num_read == -1) {
936 }
else if (num_read == 0) {
943 if (total_read < n - 1) {
957 ssize_t fbr_write(
FBR_P_ int fd,
const void *buf,
size_t count)
964 ev_io_init(&io, NULL, fd, EV_WRITE);
965 ev_io_start(fctx->
__p->loop, &io);
966 dtor.
func = watcher_io_dtor;
968 fbr_destructor_add(
FBR_A_ &dtor);
970 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
971 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
974 r = write(fd, buf, count);
975 }
while (-1 == r && EINTR == errno);
977 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
978 ev_io_stop(fctx->
__p->loop, &io);
982 ssize_t fbr_write_all(
FBR_P_ int fd,
const void *buf,
size_t count)
990 ev_io_init(&io, NULL, fd, EV_WRITE);
991 ev_io_start(fctx->
__p->loop, &io);
992 dtor.
func = watcher_io_dtor;
994 fbr_destructor_add(
FBR_A_ &dtor);
996 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
998 while (count != done) {
1000 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1002 r = write(fd, buf + done, count - done);
1017 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1018 ev_io_stop(fctx->
__p->loop, &io);
1019 return (ssize_t)done;
1022 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1023 ev_io_stop(fctx->
__p->loop, &io);
1027 ssize_t fbr_recvfrom(
FBR_P_ int sockfd,
void *buf,
size_t len,
int flags,
1028 struct sockaddr *src_addr, socklen_t *addrlen)
1034 ev_io_init(&io, NULL, sockfd, EV_READ);
1035 ev_io_start(fctx->
__p->loop, &io);
1036 dtor.
func = watcher_io_dtor;
1038 fbr_destructor_add(
FBR_A_ &dtor);
1040 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
1041 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1043 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1044 ev_io_stop(fctx->
__p->loop, &io);
1046 return recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
1049 ssize_t fbr_recv(
FBR_P_ int sockfd,
void *buf,
size_t len,
int flags)
1055 ev_io_init(&io, NULL, sockfd, EV_READ);
1056 ev_io_start(fctx->
__p->loop, &io);
1057 dtor.
func = watcher_io_dtor;
1059 fbr_destructor_add(
FBR_A_ &dtor);
1061 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
1062 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1064 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1065 ev_io_stop(fctx->
__p->loop, &io);
1067 return recv(sockfd, buf, len, flags);
1070 ssize_t fbr_sendto(
FBR_P_ int sockfd,
const void *buf,
size_t len,
int flags,
1071 const struct sockaddr *dest_addr, socklen_t addrlen)
1077 ev_io_init(&io, NULL, sockfd, EV_WRITE);
1078 ev_io_start(fctx->
__p->loop, &io);
1079 dtor.
func = watcher_io_dtor;
1081 fbr_destructor_add(
FBR_A_ &dtor);
1083 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
1084 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1086 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1087 ev_io_stop(fctx->
__p->loop, &io);
1089 return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
1092 ssize_t fbr_send(
FBR_P_ int sockfd,
const void *buf,
size_t len,
int flags)
1098 ev_io_init(&io, NULL, sockfd, EV_WRITE);
1099 ev_io_start(fctx->
__p->loop, &io);
1100 dtor.
func = watcher_io_dtor;
1102 fbr_destructor_add(
FBR_A_ &dtor);
1104 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
1105 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1107 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1108 ev_io_stop(fctx->
__p->loop, &io);
1110 return send(sockfd, buf, len, flags);
1113 int fbr_accept(
FBR_P_ int sockfd,
struct sockaddr *addr, socklen_t *addrlen)
1120 ev_io_init(&io, NULL, sockfd, EV_READ);
1121 ev_io_start(fctx->
__p->loop, &io);
1122 dtor.
func = watcher_io_dtor;
1124 fbr_destructor_add(
FBR_A_ &dtor);
1126 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
1127 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1130 r = accept(sockfd, addr, addrlen);
1131 }
while (-1 == r && EINTR == errno);
1133 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1134 ev_io_stop(fctx->
__p->loop, &io);
1139 ev_tstamp fbr_sleep(
FBR_P_ ev_tstamp seconds)
1144 ev_tstamp expected = ev_now(fctx->
__p->loop) + seconds;
1146 ev_timer_init(&timer, NULL, seconds, 0.);
1147 ev_timer_start(fctx->
__p->loop, &timer);
1148 dtor.
func = watcher_timer_dtor;
1150 fbr_destructor_add(
FBR_A_ &dtor);
1152 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&timer);
1153 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1155 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1156 ev_timer_stop(fctx->
__p->loop, &timer);
1158 return max(0., expected - ev_now(fctx->
__p->loop));
1161 static long get_page_size()
1165 sz = sysconf(_SC_PAGESIZE);
1169 static size_t round_up_to_page_size(
size_t size)
1171 long sz = get_page_size();
1173 remainder = size % sz;
1176 return size + sz - remainder;
1182 struct fbr_fiber *fiber;
1183 if (!LIST_EMPTY(&fctx->
__p->reclaimed)) {
1184 fiber = LIST_FIRST(&fctx->
__p->reclaimed);
1185 LIST_REMOVE(fiber, entries.reclaimed);
1187 fiber = malloc(
sizeof(
struct fbr_fiber));
1188 memset(fiber, 0x00,
sizeof(
struct fbr_fiber));
1189 if (0 == stack_size)
1191 stack_size = round_up_to_page_size(stack_size);
1192 fiber->stack = mmap(NULL, stack_size, PROT_READ | PROT_WRITE,
1193 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
1194 if (MAP_FAILED == fiber->stack)
1195 err(EXIT_FAILURE,
"mmap failed");
1196 fiber->stack_size = stack_size;
1197 (void)VALGRIND_STACK_REGISTER(fiber->stack, fiber->stack +
1200 fiber->id = fctx->
__p->last_id++;
1202 coro_create(&fiber->ctx, (coro_func)call_wrapper,
FBR_A, fiber->stack,
1204 LIST_INIT(&fiber->children);
1205 LIST_INIT(&fiber->pool);
1206 TAILQ_INIT(&fiber->destructors);
1209 fiber->func_arg = arg;
1210 LIST_INSERT_HEAD(&CURRENT_FIBER->children, fiber, entries.children);
1211 fiber->parent = CURRENT_FIBER;
1212 fiber->no_reclaim = 0;
1213 fiber->want_reclaim = 0;
1214 return fbr_id_pack(fiber);
1219 struct fbr_fiber *fiber, *parent;
1220 if (!fbr_id_isnull(parent_id))
1221 unpack_transfer_errno(-1, &parent, parent_id);
1223 parent = &fctx->
__p->root;
1224 fiber = CURRENT_FIBER;
1225 LIST_REMOVE(fiber, entries.children);
1226 LIST_INSERT_HEAD(&parent->children, fiber, entries.children);
1227 fiber->parent = parent;
1233 struct fbr_fiber *fiber = CURRENT_FIBER;
1234 if (fiber->parent == &fctx->
__p->root)
1236 return fbr_id_pack(fiber->parent);
1239 void *fbr_calloc(
FBR_P_ unsigned int nmemb,
size_t size)
1242 fprintf(stderr,
"libevfibers: fbr_calloc is deprecated\n");
1243 ptr = allocate_in_fiber(
FBR_A_ nmemb * size, CURRENT_FIBER);
1244 memset(ptr, 0x00, nmemb * size);
1250 fprintf(stderr,
"libevfibers: fbr_alloc is deprecated\n");
1251 return allocate_in_fiber(
FBR_A_ size, CURRENT_FIBER);
1257 struct mem_pool *pool_entry;
1258 fprintf(stderr,
"libevfibers:"
1259 " fbr_alloc_set_destructor is deprecated\n");
1260 pool_entry = (
struct mem_pool *)ptr - 1;
1261 pool_entry->destructor = func;
1262 pool_entry->destructor_context = context;
1267 fprintf(stderr,
"libevfibers: fbr_free is deprecated\n");
1268 fbr_free_in_fiber(
FBR_A_ CURRENT_FIBER, ptr, 1);
1273 fprintf(stderr,
"libevfibers: fbr_free_nd is deprecated\n");
1274 fbr_free_in_fiber(
FBR_A_ CURRENT_FIBER, ptr, 0);
1279 struct fbr_stack_item *ptr = fctx->
__p->sp;
1280 (*log)(
FBR_A_ "%s",
"Fiber call stack:");
1281 (*log)(FBR_A_
"%s",
"-------------------------------");
1282 while (ptr >= fctx->
__p->stack) {
1283 (*log)(FBR_A_
"fiber_call: %p\t%s",
1286 print_trace_info(FBR_A_ &ptr->tinfo, log);
1287 (*log)(FBR_A_
"%s",
"-------------------------------");
1295 was_empty = TAILQ_EMPTY(&fctx->
__p->pending_fibers);
1296 TAILQ_INSERT_TAIL(&fctx->
__p->pending_fibers, item, entries);
1297 item->head = &fctx->
__p->pending_fibers;
1298 if (was_empty && !TAILQ_EMPTY(&fctx->
__p->pending_fibers)) {
1299 ev_async_start(fctx->
__p->loop, &fctx->
__p->pending_async);
1301 ev_async_send(fctx->
__p->loop, &fctx->
__p->pending_async);
1304 static void transfer_later_tailq(
FBR_P_ struct fbr_id_tailq *tailq)
1308 TAILQ_FOREACH(item, tailq, entries) {
1309 item->head = &fctx->
__p->pending_fibers;
1311 was_empty = TAILQ_EMPTY(&fctx->
__p->pending_fibers);
1312 TAILQ_CONCAT(&fctx->
__p->pending_fibers, tailq, entries);
1313 if (was_empty && !TAILQ_EMPTY(&fctx->
__p->pending_fibers)) {
1314 ev_async_start(fctx->
__p->loop, &fctx->
__p->pending_async);
1316 ev_async_send(fctx->
__p->loop, &fctx->
__p->pending_async);
1328 mutex->locked_by = FBR_ID_NULL;
1329 TAILQ_INIT(&mutex->pending);
1336 fbr_ev_mutex_init(
FBR_A_ &ev, mutex);
1337 fbr_ev_wait_one(
FBR_A_ &ev.ev_base);
1338 assert(fbr_id_eq(mutex->locked_by, CURRENT_FIBER_ID));
1343 if (fbr_id_isnull(mutex->locked_by)) {
1344 mutex->locked_by = CURRENT_FIBER_ID;
1353 struct fbr_fiber *fiber = NULL;
1355 if (TAILQ_EMPTY(&mutex->pending)) {
1356 mutex->locked_by = FBR_ID_NULL;
1360 TAILQ_FOREACH_SAFE(item, &mutex->pending, entries, x) {
1361 assert(item->head == &mutex->pending);
1362 TAILQ_REMOVE(&mutex->pending, item, entries);
1363 if (-1 == fbr_id_unpack(
FBR_A_ &fiber, item->id)) {
1365 " to find a fiber by id: %s",
1372 mutex->locked_by = item->id;
1373 post_ev(
FBR_A_ fiber, item->ev);
1375 transfer_later(
FBR_A_ item);
1396 TAILQ_INIT(&cond->waiting);
1410 if (fbr_id_isnull(mutex->locked_by))
1411 return_error(-1, FBR_EINVAL);
1413 fbr_ev_cond_var_init(
FBR_A_ &ev, cond, mutex);
1414 fbr_ev_wait_one(
FBR_A_ &ev.ev_base);
1421 struct fbr_fiber *fiber;
1422 if (TAILQ_EMPTY(&cond->waiting))
1424 TAILQ_FOREACH(item, &cond->waiting, entries) {
1425 if(-1 == fbr_id_unpack(
FBR_A_ &fiber, item->id)) {
1426 assert(FBR_ENOFIBER == fctx->
f_errno);
1429 post_ev(
FBR_A_ fiber, item->ev);
1431 transfer_later_tailq(
FBR_A_ &cond->waiting);
1437 struct fbr_fiber *fiber;
1438 if (TAILQ_EMPTY(&cond->waiting))
1440 item = TAILQ_FIRST(&cond->waiting);
1441 if(-1 == fbr_id_unpack(
FBR_A_ &fiber, item->id)) {
1442 assert(FBR_ENOFIBER == fctx->
f_errno);
1445 post_ev(
FBR_A_ fiber, item->ev);
1447 assert(item->head == &cond->waiting);
1448 TAILQ_REMOVE(&cond->waiting, item, entries);
1449 transfer_later(
FBR_A_ item);
1452 int fbr_vrb_init(
struct fbr_vrb *vrb,
size_t size,
const char *file_pattern)
1455 size_t sz = get_page_size();
1456 size = (size ? round_up_to_page_size(size) : sz);
1460 temp_name = strdup(file_pattern);
1462 vrb->mem_ptr_size = size * 2 + sz * 2;
1463 vrb->mem_ptr = mmap(NULL, vrb->mem_ptr_size, PROT_NONE,
1464 MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
1465 if (MAP_FAILED == vrb->mem_ptr)
1467 vrb->lower_ptr = vrb->mem_ptr + sz;
1468 vrb->upper_ptr = vrb->lower_ptr + size;
1469 vrb->ptr_size = size;
1470 vrb->data_ptr = vrb->lower_ptr;
1471 vrb->space_ptr = vrb->lower_ptr;
1473 fd = mkstemp(temp_name);
1475 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1480 if (0 > unlink(temp_name)) {
1481 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1488 if (0 > ftruncate(fd, size)) {
1489 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1494 ptr = mmap(vrb->lower_ptr, size, PROT_READ | PROT_WRITE,
1495 MAP_FIXED | MAP_SHARED, fd, 0);
1496 if (MAP_FAILED == ptr) {
1497 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1501 if (ptr != vrb->lower_ptr) {
1502 munmap(vrb->lower_ptr, vrb->ptr_size);
1503 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1508 ptr = mmap(vrb->upper_ptr, size, PROT_READ | PROT_WRITE,
1509 MAP_FIXED | MAP_SHARED, fd, 0);
1510 if (MAP_FAILED == ptr) {
1511 munmap(vrb->lower_ptr, vrb->ptr_size);
1512 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1517 if (ptr != vrb->upper_ptr) {
1518 munmap(vrb->upper_ptr, vrb->ptr_size);
1519 munmap(vrb->lower_ptr, vrb->ptr_size);
1520 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1532 rv = fbr_vrb_init(&buffer->vrb, size, fctx->
__p->buffer_file_pattern);
1534 return_error(-1, FBR_EBUFFERMMAP);
1536 buffer->prepared_bytes = 0;
1537 buffer->waiting_bytes = 0;
1547 munmap(vrb->upper_ptr, vrb->ptr_size);
1548 munmap(vrb->lower_ptr, vrb->ptr_size);
1549 munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1554 fbr_vrb_destroy(&buffer->vrb);
1564 if (size > fbr_buffer_size(
FBR_A_ buffer))
1565 return_error(NULL, FBR_EINVAL);
1567 fbr_mutex_lock(
FBR_A_ &buffer->write_mutex);
1569 while (buffer->prepared_bytes > 0)
1570 fbr_cond_wait(
FBR_A_ &buffer->committed_cond,
1571 &buffer->write_mutex);
1573 assert(0 == buffer->prepared_bytes);
1575 buffer->prepared_bytes = size;
1577 while (fbr_buffer_free_bytes(
FBR_A_ buffer) < size)
1578 fbr_cond_wait(
FBR_A_ &buffer->bytes_freed_cond,
1579 &buffer->write_mutex);
1581 return fbr_buffer_space_ptr(
FBR_A_ buffer);
1586 fbr_vrb_give(&buffer->vrb, buffer->prepared_bytes);
1587 buffer->prepared_bytes = 0;
1588 fbr_cond_signal(
FBR_A_ &buffer->committed_cond);
1589 fbr_mutex_unlock(
FBR_A_ &buffer->write_mutex);
1594 buffer->prepared_bytes = 0;
1595 fbr_cond_signal(
FBR_A_ &buffer->committed_cond);
1596 fbr_mutex_unlock(
FBR_A_ &buffer->write_mutex);
1602 if (size > fbr_buffer_size(
FBR_A_ buffer))
1603 return_error(NULL, FBR_EINVAL);
1605 fbr_mutex_lock(
FBR_A_ &buffer->read_mutex);
1607 while (fbr_buffer_bytes(
FBR_A_ buffer) < size) {
1608 retval = fbr_cond_wait(
FBR_A_ &buffer->committed_cond,
1609 &buffer->read_mutex);
1610 assert(0 == retval);
1614 buffer->waiting_bytes = size;
1616 return_success(fbr_buffer_data_ptr(
FBR_A_ buffer));
1621 fbr_vrb_take(&buffer->vrb, buffer->waiting_bytes);
1623 fbr_cond_signal(
FBR_A_ &buffer->bytes_freed_cond);
1624 fbr_mutex_unlock(
FBR_A_ &buffer->read_mutex);
1629 fbr_mutex_unlock(
FBR_A_ &buffer->read_mutex);
1635 fbr_mutex_lock(
FBR_A_ &buffer->read_mutex);
1636 fbr_mutex_lock(
FBR_A_ &buffer->write_mutex);
1637 rv = fbr_vrb_resize(&buffer->vrb, size, fctx->
__p->buffer_file_pattern);
1638 fbr_mutex_unlock(
FBR_A_ &buffer->write_mutex);
1639 fbr_mutex_unlock(
FBR_A_ &buffer->read_mutex);
1641 return_error(-1, FBR_EBUFFERMMAP);
1647 struct fbr_fiber *fiber;
1648 unpack_transfer_errno(NULL, &fiber,
id);
1649 return_success(fiber->user_data);
1654 struct fbr_fiber *fiber;
1655 unpack_transfer_errno(-1, &fiber,
id);
1656 fiber->user_data = data;
1662 struct fbr_fiber *fiber = CURRENT_FIBER;
1663 TAILQ_INSERT_TAIL(&fiber->destructors, dtor, entries);
1670 struct fbr_fiber *fiber = CURRENT_FIBER;
1672 if (0 == dtor->active)
1675 TAILQ_REMOVE(&fiber->destructors, dtor, entries);
1677 dtor->func(
FBR_A_ dtor->arg);
1681 static inline int wrap_ffsll(uint64_t val)
1684 return __builtin_ffsll(val);
1689 return 0 == (fctx->
__p->key_free_mask & (1 << key));
1694 fctx->
__p->key_free_mask &= ~(1 << key);
1699 fctx->
__p->key_free_mask |= (1 << key);
1706 register_key(
FBR_A_ key);
1713 if (!is_key_registered(
FBR_A_ key))
1714 return_error(-1, FBR_ENOKEY);
1716 unregister_key(
FBR_A_ key);
1723 struct fbr_fiber *fiber;
1725 unpack_transfer_errno(-1, &fiber,
id);
1727 if (!is_key_registered(
FBR_A_ key))
1728 return_error(-1, FBR_ENOKEY);
1730 fiber->key_data[key] = value;
1736 struct fbr_fiber *fiber;
1738 unpack_transfer_errno(NULL, &fiber,
id);
1740 if (!is_key_registered(
FBR_A_ key))
1741 return_error(NULL, FBR_ENOKEY);
1743 return fiber->key_data[key];
1748 struct fbr_fiber *fiber;
1749 unpack_transfer_errno(NULL, &fiber,
id);
1750 return_success(fiber->name);
1755 struct fbr_fiber *fiber;
1756 unpack_transfer_errno(-1, &fiber,
id);
1761 static int make_pipe(
FBR_P_ int *r,
int*w)
1767 return_error(-1, FBR_ESYSTEM);
1773 pid_t fbr_popen3(
FBR_P_ const char *filename,
char *
const argv[],
1774 char *
const envp[],
const char *working_dir,
1775 int *stdin_w_ptr,
int *stdout_r_ptr,
int *stderr_r_ptr)
1778 int stdin_r = 0, stdin_w = 0;
1779 int stdout_r = 0, stdout_w = 0;
1780 int stderr_r = 0, stderr_w = 0;
1784 if (!stdin_w_ptr || !stdout_r_ptr || !stderr_r_ptr)
1785 devnull = open(
"/dev/null", O_WRONLY);
1787 retval = (stdin_w_ptr ? make_pipe(
FBR_A_ &stdin_r, &stdin_w) : 0);
1790 retval = (stdout_r_ptr ? make_pipe(
FBR_A_ &stdout_r, &stdout_w) : 0);
1793 retval = (stderr_r_ptr ? make_pipe(
FBR_A_ &stderr_r, &stderr_w) : 0);
1799 return_error(-1, FBR_ESYSTEM);
1802 ev_break(EV_DEFAULT, EVBREAK_ALL);
1804 retval = close(stdin_w);
1806 err(EXIT_FAILURE,
"close");
1807 retval = dup2(stdin_r, STDIN_FILENO);
1809 err(EXIT_FAILURE,
"dup2");
1811 devnull = open(
"/dev/null", O_RDONLY);
1813 err(EXIT_FAILURE,
"open");
1814 retval = dup2(devnull, STDIN_FILENO);
1816 err(EXIT_FAILURE,
"dup2");
1819 retval = close(stdout_r);
1821 err(EXIT_FAILURE,
"close");
1822 retval = dup2(stdout_w, STDOUT_FILENO);
1824 err(EXIT_FAILURE,
"dup2");
1826 devnull = open(
"/dev/null", O_WRONLY);
1828 err(EXIT_FAILURE,
"open");
1829 retval = dup2(devnull, STDOUT_FILENO);
1831 err(EXIT_FAILURE,
"dup2");
1834 retval = close(stderr_r);
1836 err(EXIT_FAILURE,
"close");
1837 retval = dup2(stderr_w, STDERR_FILENO);
1839 err(EXIT_FAILURE,
"dup2");
1841 devnull = open(
"/dev/null", O_WRONLY);
1843 err(EXIT_FAILURE,
"open");
1844 retval = dup2(stderr_w, STDERR_FILENO);
1846 err(EXIT_FAILURE,
"dup2");
1850 retval = chdir(working_dir);
1852 err(EXIT_FAILURE,
"chdir");
1855 retval = execve(filename, argv, envp);
1857 err(EXIT_FAILURE,
"execve");
1859 errx(EXIT_FAILURE,
"execve failed without error code");
1863 retval = close(stdin_r);
1865 return_error(-1, FBR_ESYSTEM);
1866 retval = fbr_fd_nonblock(
FBR_A_ stdin_w);
1871 retval = close(stdout_w);
1873 return_error(-1, FBR_ESYSTEM);
1874 retval = fbr_fd_nonblock(
FBR_A_ stdout_r);
1879 retval = close(stderr_w);
1881 return_error(-1, FBR_ESYSTEM);
1882 retval = fbr_fd_nonblock(
FBR_A_ stderr_r);
1888 *stdin_w_ptr = stdin_w;
1889 *stdout_r_ptr = stdout_r;
1890 *stderr_r_ptr = stderr_r;
1894 static void watcher_child_dtor(_unused_
FBR_P_ void *_arg)
1896 struct ev_child *w = _arg;
1897 ev_child_stop(fctx->
__p->loop, w);
1900 int fbr_waitpid(
FBR_P_ pid_t pid)
1902 struct ev_child child;
1905 ev_child_init(&child, NULL, pid, 0.);
1906 ev_child_start(fctx->
__p->loop, &child);
1907 dtor.
func = watcher_child_dtor;
1909 fbr_destructor_add(
FBR_A_ &dtor);
1911 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&child);
1912 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
1914 fbr_destructor_remove(
FBR_A_ &dtor, 0 );
1915 ev_child_stop(fctx->
__p->loop, &child);
1916 return_success(child.rstatus);
1919 int fbr_system(
FBR_P_ const char *filename,
char *
const argv[],
1920 char *
const envp[],
const char *working_dir)
1927 return_error(-1, FBR_ESYSTEM);
1930 ev_break(EV_DEFAULT, EVBREAK_ALL);
1933 retval = chdir(working_dir);
1935 err(EXIT_FAILURE,
"chdir");
1938 retval = execve(filename, argv, envp);
1940 err(EXIT_FAILURE,
"execve");
1942 errx(EXIT_FAILURE,
"execve failed without error code");
1947 return fbr_waitpid(
FBR_A_ pid);
1950 #ifdef FBR_EIO_ENABLED
1952 static struct ev_loop *eio_loop;
1953 static ev_idle repeat_watcher;
1954 static ev_async ready_watcher;
1958 static void repeat(EV_P_ ev_idle *w, _unused_
int revents)
1960 if (eio_poll () != -1)
1961 ev_idle_stop(EV_A_ w);
1965 static void ready(EV_P_ _unused_ ev_async *w, _unused_
int revents)
1967 if (eio_poll() == -1)
1968 ev_idle_start(EV_A_ &repeat_watcher);
1972 static void want_poll()
1974 ev_async_send(eio_loop, &ready_watcher);
1979 if (NULL != eio_loop) {
1980 fprintf(stderr,
"libevfibers: fbr_eio_init called twice");
1983 eio_loop = EV_DEFAULT;
1984 ev_idle_init(&repeat_watcher, repeat);
1985 ev_async_init(&ready_watcher, ready);
1986 ev_async_start(eio_loop, &ready_watcher);
1988 eio_init(want_poll, 0);
1997 static void eio_req_dtor(_unused_
FBR_P_ void *_arg)
1999 eio_req *req = _arg;
2003 static int fiber_eio_cb(eio_req *req)
2005 struct fbr_fiber *fiber;
2013 if (EIO_CANCELLED(req))
2016 retval = fbr_id_unpack(
FBR_A_ &fiber, ev->ev_base.
id);
2019 " the eio callback, but it's id is not valid: %s",
2024 post_ev(
FBR_A_ fiber, &ev->ev_base);
2026 retval = fbr_transfer(
FBR_A_ fbr_id_pack(fiber));
2027 assert(0 == retval);
2031 #define FBR_EIO_PREP \
2033 struct fbr_ev_eio e_eio; \
2035 struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER; \
2038 #define FBR_EIO_WAIT \
2039 if (NULL == req) { \
2040 ev_unref(eio_loop); \
2041 return_error(-1, FBR_EEIO); \
2043 dtor.func = eio_req_dtor; \
2045 fbr_destructor_add(FBR_A_ &dtor); \
2046 fbr_ev_eio_init(FBR_A_ &e_eio, req); \
2047 retval = fbr_ev_wait_one(FBR_A_ &e_eio.ev_base); \
2048 fbr_destructor_remove(FBR_A_ &dtor, 0 ); \
2052 #define FBR_EIO_RESULT_CHECK \
2053 if (0 > req->result) { \
2054 errno = req->errorno; \
2055 return_error(-1, FBR_ESYSTEM); \
2058 #define FBR_EIO_RESULT_RET \
2059 FBR_EIO_RESULT_CHECK \
2062 int fbr_eio_open(
FBR_P_ const char *path,
int flags, mode_t mode,
int pri)
2065 req = eio_open(path, flags, mode, pri, fiber_eio_cb, &e_eio);
2070 int fbr_eio_truncate(
FBR_P_ const char *path, off_t offset,
int pri)
2073 req = eio_truncate(path, offset, pri, fiber_eio_cb, &e_eio);
2078 int fbr_eio_chown(
FBR_P_ const char *path, uid_t uid, gid_t gid,
int pri)
2081 req = eio_chown(path, uid, gid, pri, fiber_eio_cb, &e_eio);
2086 int fbr_eio_chmod(
FBR_P_ const char *path, mode_t mode,
int pri)
2089 req = eio_chmod(path, mode, pri, fiber_eio_cb, &e_eio);
2094 int fbr_eio_mkdir(
FBR_P_ const char *path, mode_t mode,
int pri)
2097 req = eio_mkdir(path, mode, pri, fiber_eio_cb, &e_eio);
2102 int fbr_eio_rmdir(
FBR_P_ const char *path,
int pri)
2105 req = eio_rmdir(path, pri, fiber_eio_cb, &e_eio);
2110 int fbr_eio_unlink(
FBR_P_ const char *path,
int pri)
2113 req = eio_unlink(path, pri, fiber_eio_cb, &e_eio);
2118 int fbr_eio_utime(
FBR_P_ const char *path, eio_tstamp atime, eio_tstamp mtime,
2122 req = eio_utime(path, atime, mtime, pri, fiber_eio_cb, &e_eio);
2127 int fbr_eio_mknod(
FBR_P_ const char *path, mode_t mode, dev_t dev,
int pri)
2130 req = eio_mknod(path, mode, dev, pri, fiber_eio_cb, &e_eio);
2135 int fbr_eio_link(
FBR_P_ const char *path,
const char *new_path,
int pri)
2138 req = eio_link(path, new_path, pri, fiber_eio_cb, &e_eio);
2143 int fbr_eio_symlink(
FBR_P_ const char *path,
const char *new_path,
int pri)
2146 req = eio_symlink(path, new_path, pri, fiber_eio_cb, &e_eio);
2151 int fbr_eio_rename(
FBR_P_ const char *path,
const char *new_path,
int pri)
2154 req = eio_rename(path, new_path, pri, fiber_eio_cb, &e_eio);
2159 int fbr_eio_mlock(
FBR_P_ void *addr,
size_t length,
int pri)
2162 req = eio_mlock(addr, length, pri, fiber_eio_cb, &e_eio);
2167 int fbr_eio_close(
FBR_P_ int fd,
int pri)
2170 req = eio_close(fd, pri, fiber_eio_cb, &e_eio);
2175 int fbr_eio_sync(
FBR_P_ int pri)
2178 req = eio_sync(pri, fiber_eio_cb, &e_eio);
2183 int fbr_eio_fsync(
FBR_P_ int fd,
int pri)
2186 req = eio_fsync(fd, pri, fiber_eio_cb, &e_eio);
2191 int fbr_eio_fdatasync(
FBR_P_ int fd,
int pri)
2194 req = eio_fdatasync(fd, pri, fiber_eio_cb, &e_eio);
2199 int fbr_eio_futime(
FBR_P_ int fd, eio_tstamp atime, eio_tstamp mtime,
int pri)
2202 req = eio_futime(fd, atime, mtime, pri, fiber_eio_cb, &e_eio);
2207 int fbr_eio_ftruncate(
FBR_P_ int fd, off_t offset,
int pri)
2210 req = eio_ftruncate(fd, offset, pri, fiber_eio_cb, &e_eio);
2215 int fbr_eio_fchmod(
FBR_P_ int fd, mode_t mode,
int pri)
2218 req = eio_fchmod(fd, mode, pri, fiber_eio_cb, &e_eio);
2223 int fbr_eio_fchown(
FBR_P_ int fd, uid_t uid, gid_t gid,
int pri)
2226 req = eio_fchown(fd, uid, gid, pri, fiber_eio_cb, &e_eio);
2231 int fbr_eio_dup2(
FBR_P_ int fd,
int fd2,
int pri)
2234 req = eio_dup2(fd, fd2, pri, fiber_eio_cb, &e_eio);
2239 ssize_t fbr_eio_seek(
FBR_P_ int fd, off_t offset,
int whence,
int pri)
2242 req = eio_seek(fd, offset, whence, pri, fiber_eio_cb, &e_eio);
2244 FBR_EIO_RESULT_CHECK;
2248 ssize_t fbr_eio_read(
FBR_P_ int fd,
void *buf,
size_t length, off_t offset,
2252 req = eio_read(fd, buf, length, offset, pri, fiber_eio_cb, &e_eio);
2257 ssize_t fbr_eio_write(
FBR_P_ int fd,
void *buf,
size_t length, off_t offset,
2261 req = eio_write(fd, buf, length, offset, pri, fiber_eio_cb, &e_eio);
2266 int fbr_eio_mlockall(
FBR_P_ int flags,
int pri)
2269 req = eio_mlockall(flags, pri, fiber_eio_cb, &e_eio);
2274 int fbr_eio_msync(
FBR_P_ void *addr,
size_t length,
int flags,
int pri)
2277 req = eio_msync(addr, length, flags, pri, fiber_eio_cb, &e_eio);
2282 int fbr_eio_readlink(
FBR_P_ const char *path,
char *buf,
size_t size,
int pri)
2285 req = eio_readlink(path, pri, fiber_eio_cb, &e_eio);
2287 FBR_EIO_RESULT_CHECK;
2288 strncpy(buf, req->ptr2, min(size, (
size_t)req->result));
2292 int fbr_eio_realpath(
FBR_P_ const char *path,
char *buf,
size_t size,
int pri)
2295 req = eio_realpath(path, pri, fiber_eio_cb, &e_eio);
2297 FBR_EIO_RESULT_CHECK;
2298 strncpy(buf, req->ptr2, min(size, (
size_t)req->result));
2302 int fbr_eio_stat(
FBR_P_ const char *path, EIO_STRUCT_STAT *statdata,
int pri)
2304 EIO_STRUCT_STAT *st;
2306 req = eio_stat(path, pri, fiber_eio_cb, &e_eio);
2308 FBR_EIO_RESULT_CHECK;
2309 st = (EIO_STRUCT_STAT *)req->ptr2;
2310 memcpy(statdata, st,
sizeof(*st));
2314 int fbr_eio_lstat(
FBR_P_ const char *path, EIO_STRUCT_STAT *statdata,
int pri)
2316 EIO_STRUCT_STAT *st;
2318 req = eio_lstat(path, pri, fiber_eio_cb, &e_eio);
2320 FBR_EIO_RESULT_CHECK;
2321 st = (EIO_STRUCT_STAT *)req->ptr2;
2322 memcpy(statdata, st,
sizeof(*st));
2326 int fbr_eio_fstat(
FBR_P_ int fd, EIO_STRUCT_STAT *statdata,
int pri)
2328 EIO_STRUCT_STAT *st;
2330 req = eio_fstat(fd, pri, fiber_eio_cb, &e_eio);
2332 FBR_EIO_RESULT_CHECK;
2333 st = (EIO_STRUCT_STAT *)req->ptr2;
2334 memcpy(statdata, st,
sizeof(*st));
2338 int fbr_eio_statvfs(
FBR_P_ const char *path, EIO_STRUCT_STATVFS *statdata,
2341 EIO_STRUCT_STATVFS *st;
2343 req = eio_statvfs(path, pri, fiber_eio_cb, &e_eio);
2345 FBR_EIO_RESULT_CHECK;
2346 st = (EIO_STRUCT_STATVFS *)req->ptr2;
2347 memcpy(statdata, st,
sizeof(*st));
2351 int fbr_eio_fstatvfs(
FBR_P_ int fd, EIO_STRUCT_STATVFS *statdata,
int pri)
2353 EIO_STRUCT_STATVFS *st;
2355 req = eio_fstatvfs(fd, pri, fiber_eio_cb, &e_eio);
2357 FBR_EIO_RESULT_CHECK;
2358 st = (EIO_STRUCT_STATVFS *)req->ptr2;
2359 memcpy(statdata, st,
sizeof(*st));
2363 int fbr_eio_sendfile(
FBR_P_ int out_fd,
int in_fd, off_t in_offset,
2364 size_t length,
int pri)
2367 req = eio_sendfile(out_fd, in_fd, in_offset, length, pri, fiber_eio_cb,
2373 int fbr_eio_readahead(
FBR_P_ int fd, off_t offset,
size_t length,
int pri)
2376 req = eio_readahead(fd, offset, length, pri, fiber_eio_cb, &e_eio);
2381 int fbr_eio_syncfs(
FBR_P_ int fd,
int pri)
2384 req = eio_syncfs(fd, pri, fiber_eio_cb, &e_eio);
2389 int fbr_eio_sync_file_range(
FBR_P_ int fd, off_t offset,
size_t nbytes,
2390 unsigned int flags,
int pri)
2393 req = eio_sync_file_range(fd, offset, nbytes, flags, pri, fiber_eio_cb,
2399 int fbr_eio_fallocate(
FBR_P_ int fd,
int mode, off_t offset, off_t len,
int pri)
2402 req = eio_fallocate(fd, mode, offset, len, pri, fiber_eio_cb, &e_eio);
2407 static void custom_execute_cb(eio_req *req)
2410 req->result = ev->custom_func(ev->custom_arg);
2417 e_eio.custom_func = func;
2418 e_eio.custom_arg = data;
2419 req = eio_custom(custom_execute_cb, pri, fiber_eio_cb, &e_eio);