31 #include <valgrind/valgrind.h>
33 #include <evfibers_private/fiber.h>
35 #ifndef LIST_FOREACH_SAFE
36 #define LIST_FOREACH_SAFE(var, head, field, next_var) \
37 for ((var) = ((head)->lh_first); \
38 (var) && ((next_var) = ((var)->field.le_next), 1); \
43 #ifndef TAILQ_FOREACH_SAFE
44 #define TAILQ_FOREACH_SAFE(var, head, field, next_var) \
45 for ((var) = ((head)->tqh_first); \
46 (var) ? ({ (next_var) = ((var)->field.tqe_next); 1; }) \
52 #define ENSURE_ROOT_FIBER do { \
53 assert(fctx->__p->sp->fiber == &fctx->__p->root); \
56 #define CURRENT_FIBER (fctx->__p->sp->fiber)
57 #define CURRENT_FIBER_ID (fbr_id_pack(CURRENT_FIBER))
58 #define CALLED_BY_ROOT ((fctx->__p->sp - 1)->fiber == &fctx->__p->root)
60 #define unpack_transfer_errno(value, ptr, id) \
62 if (-1 == fbr_id_unpack(fctx, ptr, id)) \
66 #define return_success(value) \
68 fctx->f_errno = FBR_SUCCESS; \
72 #define return_error(value, code) \
74 fctx->f_errno = (code); \
79 static fbr_id_t fbr_id_pack(
struct fbr_fiber *fiber)
81 return ((__uint128_t)fiber->id << 64) | (uint64_t)fiber;
84 static int fbr_id_unpack(
FBR_P_ struct fbr_fiber **ptr,
fbr_id_t id)
86 struct fbr_fiber *fiber;
88 f_id = (uint64_t)(
id >> 64);
89 fiber = (
struct fbr_fiber *)(uint64_t)id;
90 if (fiber->id != f_id)
91 return_error(-1, FBR_ENOFIBER);
97 static void pending_async_cb(EV_P_ ev_async *w, _unused_
int revents)
100 struct fiber_id_tailq_i *item;
106 if (TAILQ_EMPTY(&fctx->
__p->pending_fibers)) {
107 ev_async_stop(EV_A_ &fctx->
__p->pending_async);
111 item = TAILQ_FIRST(&fctx->
__p->pending_fibers);
112 TAILQ_REMOVE(&fctx->
__p->pending_fibers, item, entries);
113 if (TAILQ_EMPTY(&fctx->
__p->pending_fibers))
114 ev_async_stop(EV_A_ &fctx->
__p->pending_async);
116 ev_async_send(EV_A_ &fctx->
__p->pending_async);
118 retval = fbr_transfer(
FBR_A_ item->id);
120 if (-1 == retval && FBR_ENOFIBER != fctx->
f_errno) {
122 " a fiber by id: %s",
127 static void *allocate_in_fiber(
FBR_P_ size_t size,
struct fbr_fiber *in)
129 struct mem_pool *pool_entry;
130 pool_entry = malloc(size +
sizeof(
struct mem_pool));
131 if (NULL == pool_entry) {
133 size +
sizeof(
struct mem_pool));
136 pool_entry->ptr = pool_entry;
137 pool_entry->destructor = NULL;
138 pool_entry->destructor_context = NULL;
139 LIST_INSERT_HEAD(&in->pool, pool_entry, entries);
140 return pool_entry + 1;
144 const char *format, va_list ap)
146 struct fbr_fiber *fiber;
150 if (level > logger->level)
153 fiber = CURRENT_FIBER;
160 case FBR_LOG_WARNING:
161 str_level =
"WARNING";
165 str_level =
"NOTICE";
181 fprintf(stream,
"%-7s %-16s ", str_level, fiber->name);
182 vfprintf(stream, format, ap);
183 fprintf(stream,
"\n");
188 struct fbr_fiber *root;
191 fctx->
__p = malloc(
sizeof(
struct fbr_context_private));
192 LIST_INIT(&fctx->
__p->reclaimed);
193 LIST_INIT(&fctx->
__p->root.children);
194 LIST_INIT(&fctx->
__p->root.pool);
195 TAILQ_INIT(&fctx->
__p->pending_fibers);
197 root = &fctx->
__p->root;
199 fctx->
__p->last_id = 0;
200 root->id = fctx->
__p->last_id++;
201 coro_create(&root->ctx, NULL, NULL, NULL, 0);
204 logger->
logv = stdio_logger;
205 logger->
level = FBR_LOG_NOTICE;
208 fctx->
__p->sp = fctx->
__p->stack;
209 fctx->
__p->sp->fiber = root;
210 fctx->
__p->backtraces_enabled = 1;
211 fill_trace_info(
FBR_A_ &fctx->
__p->sp->tinfo);
212 fctx->
__p->loop = loop;
213 fctx->
__p->pending_async.data = fctx;
214 fctx->
__p->backtraces_enabled = 0;
215 ev_async_init(&fctx->
__p->pending_async, pending_async_cb);
224 return "Invalid argument";
226 return "No such fiber";
228 return "System error, consult system errno";
229 case FBR_EBUFFERMMAP:
230 return "Failed to mmap two adjacent regions";
232 return "Unknown error";
238 va_start(ap, format);
246 va_start(ap, format);
254 va_start(ap, format);
262 va_start(ap, format);
270 va_start(ap, format);
275 static struct fiber_id_tailq_i *id_tailq_i_for(_unused_
FBR_P_
276 struct fbr_fiber *fiber)
278 struct fiber_id_tailq_i *item;
279 item = malloc(
sizeof(
struct fiber_id_tailq_i));
280 item->id = fbr_id_pack(fiber);
285 static void reclaim_children(
FBR_P_ struct fbr_fiber *fiber)
288 LIST_FOREACH(f, &fiber->children, entries.children) {
289 fbr_reclaim(
FBR_A_ fbr_id_pack(f));
295 struct fbr_fiber *fiber, *x;
297 reclaim_children(
FBR_A_ &fctx->
__p->root);
299 LIST_FOREACH_SAFE(fiber, &fctx->
__p->reclaimed, entries.reclaimed, x) {
300 if (0 != munmap(fiber->stack, fiber->stack_size))
301 err(EXIT_FAILURE,
"munmap");
311 fctx->
__p->backtraces_enabled = 1;
313 fctx->
__p->backtraces_enabled = 0;
322 struct fiber_id_tailq_i *item;
326 ev_set_cb(e_watcher->
w, NULL);
330 item = e_mutex->ev_base.data;
331 TAILQ_REMOVE(&e_mutex->
mutex->pending, item, entries);
336 item = e_cond->ev_base.data;
337 TAILQ_REMOVE(&e_cond->
cond->waiting, item, entries);
348 assert(NULL == fiber->ev.arrived);
349 assert(NULL != fiber->ev.waiting);
351 fiber->ev.arrived = ev;
353 for (ev_pptr = fiber->ev.waiting; NULL != *ev_pptr; ev_pptr++) {
355 if (ev_ptr != fiber->ev.arrived)
360 static void ev_watcher_cb(_unused_ EV_P_ ev_watcher *w, _unused_
int event)
362 struct fbr_fiber *fiber;
369 retval = fbr_id_unpack(
FBR_A_ &fiber, ev->ev_base.
id);
372 " the watcher callback, but it's id is not valid: %s",
377 post_ev(
FBR_A_ fiber, &ev->ev_base);
379 retval = fbr_transfer(
FBR_A_ fbr_id_pack(fiber));
384 static void fbr_free_in_fiber(_unused_
FBR_P_ _unused_
struct fbr_fiber *fiber,
385 void *ptr,
int destructor)
387 struct mem_pool *pool_entry = NULL;
390 pool_entry = (
struct mem_pool *)ptr - 1;
391 if (pool_entry->ptr != pool_entry) {
393 "fiber memory pool entry", ptr);
394 if (!RUNNING_ON_VALGRIND)
397 LIST_REMOVE(pool_entry, entries);
398 if (destructor && pool_entry->destructor)
399 pool_entry->destructor(
FBR_A_ ptr, pool_entry->destructor_context);
403 static void fiber_cleanup(
FBR_P_ struct fbr_fiber *fiber)
405 struct mem_pool *p, *x;
407 LIST_REMOVE(fiber, entries.children);
408 LIST_FOREACH_SAFE(p, &fiber->pool, entries, x) {
409 fbr_free_in_fiber(
FBR_A_ fiber, p + 1, 1);
415 struct fbr_fiber *fiber;
417 unpack_transfer_errno(-1, &fiber,
id);
419 fill_trace_info(
FBR_A_ &fiber->reclaim_tinfo);
420 reclaim_children(
FBR_A_ fiber);
421 fiber_cleanup(
FBR_A_ fiber);
422 fiber->id = fctx->
__p->last_id++;
423 LIST_INSERT_HEAD(&fctx->
__p->reclaimed, fiber, entries.reclaimed);
430 if (0 == fbr_id_unpack(
FBR_A_ NULL,
id))
437 return CURRENT_FIBER_ID;
440 static void call_wrapper(
FBR_P)
443 struct fbr_fiber *fiber = CURRENT_FIBER;
445 fiber->func(
FBR_A_ fiber->func_arg);
447 retval = fbr_reclaim(
FBR_A_ fbr_id_pack(fiber));
453 enum ev_action_hint {
464 struct fiber_id_tailq_i *item;
468 if (!ev_is_active(e_watcher->
w))
470 e_watcher->
w->data = e_watcher;
471 ev_set_cb(e_watcher->
w, ev_watcher_cb);
475 if (0 == e_mutex->
mutex->locked_by) {
476 e_mutex->
mutex->locked_by = CURRENT_FIBER_ID;
477 return EV_AH_ARRIVED;
480 item = id_tailq_i_for(
FBR_A_ CURRENT_FIBER);
483 TAILQ_INSERT_TAIL(&e_mutex->
mutex->pending, item, entries);
487 if (0 == e_cond->
mutex->locked_by)
488 return_error(-1, FBR_EINVAL);
489 item = id_tailq_i_for(
FBR_A_ CURRENT_FIBER);
492 TAILQ_INSERT_TAIL(&e_cond->
cond->waiting, item, entries);
510 ev_set_cb(e_watcher->
w, NULL);
520 struct fbr_fiber *fiber = CURRENT_FIBER;
523 enum ev_action_hint hint;
525 fiber->ev.arrived = NULL;
526 fiber->ev.waiting = events;
528 for (ev_pptr = events; NULL != *ev_pptr; ev_pptr++) {
529 hint = prepare_ev(
FBR_A_ *ev_pptr);
534 fiber->ev.arrived = *ev_pptr;
535 while (--ev_pptr >= events)
536 cancel_ev(
FBR_A_ *ev_pptr);
539 return_error(NULL, FBR_EINVAL);
544 while (NULL == fiber->ev.arrived)
547 ev_ptr = fiber->ev.arrived;
548 fiber->ev.arrived = NULL;
549 fiber->ev.waiting = NULL;
552 return_success(ev_ptr);
559 assert(ev_ptr == one);
565 struct fbr_fiber *callee;
566 struct fbr_fiber *caller = fctx->
__p->sp->fiber;
568 unpack_transfer_errno(-1, &callee, to);
572 fctx->
__p->sp->fiber = callee;
573 fill_trace_info(
FBR_A_ &fctx->
__p->sp->tinfo);
575 coro_transfer(&caller->ctx, &callee->ctx);
582 struct fbr_fiber *callee = fctx->
__p->sp->fiber;
583 struct fbr_fiber *caller = (--fctx->
__p->sp)->fiber;
584 coro_transfer(&callee->ctx, &caller->ctx);
591 flags = fcntl(fd, F_GETFL, 0);
593 return_error(-1, FBR_ESYSTEM);
596 s = fcntl(fd, F_SETFL, flags);
598 return_error(-1, FBR_ESYSTEM);
607 ev->id = CURRENT_FIBER_ID;
617 ssize_t fbr_read(
FBR_P_ int fd,
void *buf,
size_t count)
623 ev_io_init(&io, NULL, fd, EV_READ);
624 ev_io_start(fctx->
__p->loop, &io);
626 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
627 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
630 r = read(fd, buf, count);
631 }
while (-1 == r && EINTR == errno);
633 ev_io_stop(fctx->
__p->loop, &io);
638 ssize_t fbr_read_all(
FBR_P_ int fd,
void *buf,
size_t count)
645 ev_io_init(&io, NULL, fd, EV_READ);
646 ev_io_start(fctx->
__p->loop, &io);
648 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
650 while (count != done) {
652 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
654 r = read(fd, buf + done, count - done);
671 ev_io_stop(fctx->
__p->loop, &io);
672 return (ssize_t)done;
675 ev_io_stop(fctx->
__p->loop, &io);
679 ssize_t fbr_readline(
FBR_P_ int fd,
void *buffer,
size_t n)
686 if (n <= 0 || buffer == NULL) {
695 num_read = fbr_read(
FBR_A_ fd, &ch, 1);
697 if (num_read == -1) {
703 }
else if (num_read == 0) {
710 if (total_read < n - 1) {
724 ssize_t fbr_write(
FBR_P_ int fd,
const void *buf,
size_t count)
730 ev_io_init(&io, NULL, fd, EV_WRITE);
731 ev_io_start(fctx->
__p->loop, &io);
733 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
734 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
737 r = write(fd, buf, count);
738 }
while (-1 == r && EINTR == errno);
740 ev_io_stop(fctx->
__p->loop, &io);
744 ssize_t fbr_write_all(
FBR_P_ int fd,
const void *buf,
size_t count)
751 ev_io_init(&io, NULL, fd, EV_WRITE);
752 ev_io_start(fctx->
__p->loop, &io);
754 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
756 while (count != done) {
758 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
760 r = write(fd, buf + done, count - done);
775 ev_io_stop(fctx->
__p->loop, &io);
776 return (ssize_t)done;
779 ev_io_stop(fctx->
__p->loop, &io);
783 ssize_t fbr_recvfrom(
FBR_P_ int sockfd,
void *buf,
size_t len,
int flags,
784 struct sockaddr *src_addr, socklen_t *addrlen)
789 ev_io_init(&io, NULL, sockfd, EV_READ);
790 ev_io_start(fctx->
__p->loop, &io);
792 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
793 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
795 ev_io_stop(fctx->
__p->loop, &io);
797 return recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
800 ssize_t fbr_sendto(
FBR_P_ int sockfd,
const void *buf,
size_t len,
int flags,
const
801 struct sockaddr *dest_addr, socklen_t addrlen)
806 ev_io_init(&io, NULL, sockfd, EV_WRITE);
807 ev_io_start(fctx->
__p->loop, &io);
809 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
810 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
812 ev_io_stop(fctx->
__p->loop, &io);
814 return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
817 int fbr_accept(
FBR_P_ int sockfd,
struct sockaddr *addr, socklen_t *addrlen)
823 ev_io_init(&io, NULL, sockfd, EV_READ);
824 ev_io_start(fctx->
__p->loop, &io);
826 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&io);
827 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
830 r = accept(sockfd, addr, addrlen);
831 }
while (-1 == r && EINTR == errno);
833 ev_io_stop(fctx->
__p->loop, &io);
838 ev_tstamp fbr_sleep(
FBR_P_ ev_tstamp seconds)
842 ev_tstamp expected = ev_now(fctx->
__p->loop) + seconds;
844 ev_timer_init(&timer, NULL, seconds, 0.);
845 ev_timer_start(fctx->
__p->loop, &timer);
847 fbr_ev_watcher_init(
FBR_A_ &watcher, (ev_watcher *)&timer);
848 fbr_ev_wait_one(
FBR_A_ &watcher.ev_base);
850 ev_timer_stop(fctx->
__p->loop, &timer);
852 return max(0., expected - ev_now(fctx->
__p->loop));
855 static size_t round_up_to_page_size(
size_t size)
860 sz = sysconf(_SC_PAGESIZE);
861 remainder = size % sz;
864 return size + sz - remainder;
870 struct fbr_fiber *fiber;
871 if (!LIST_EMPTY(&fctx->
__p->reclaimed)) {
872 fiber = LIST_FIRST(&fctx->
__p->reclaimed);
873 LIST_REMOVE(fiber, entries.reclaimed);
875 fiber = malloc(
sizeof(
struct fbr_fiber));
876 memset(fiber, 0x00,
sizeof(
struct fbr_fiber));
879 stack_size = round_up_to_page_size(stack_size);
880 fiber->stack = mmap(NULL, stack_size, PROT_READ | PROT_WRITE,
881 MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
882 if (MAP_FAILED == fiber->stack)
883 err(EXIT_FAILURE,
"mmap failed");
884 fiber->stack_size = stack_size;
885 (void)VALGRIND_STACK_REGISTER(fiber->stack, fiber->stack +
887 fiber->id = fctx->
__p->last_id++;
889 coro_create(&fiber->ctx, (coro_func)call_wrapper,
FBR_A, fiber->stack,
891 fiber->call_list = NULL;
892 fiber->call_list_size = 0;
893 LIST_INIT(&fiber->children);
894 LIST_INIT(&fiber->pool);
897 fiber->func_arg = arg;
898 LIST_INSERT_HEAD(&CURRENT_FIBER->children, fiber, entries.children);
899 fiber->parent = CURRENT_FIBER;
900 return fbr_id_pack(fiber);
905 struct fbr_fiber *fiber, *parent;
907 unpack_transfer_errno(-1, &parent, parent_id);
909 parent = &fctx->
__p->root;
910 fiber = CURRENT_FIBER;
911 LIST_REMOVE(fiber, entries.children);
912 LIST_INSERT_HEAD(&parent->children, fiber, entries.children);
913 fiber->parent = parent;
919 struct fbr_fiber *fiber = CURRENT_FIBER;
920 if (fiber->parent == &fctx->
__p->root)
922 return fbr_id_pack(fiber->parent);
925 void *fbr_calloc(
FBR_P_ unsigned int nmemb,
size_t size)
928 ptr = allocate_in_fiber(
FBR_A_ nmemb * size, CURRENT_FIBER);
929 memset(ptr, 0x00, nmemb * size);
935 return allocate_in_fiber(
FBR_A_ size, CURRENT_FIBER);
941 struct mem_pool *pool_entry;
942 pool_entry = (
struct mem_pool *)ptr - 1;
943 pool_entry->destructor = func;
944 pool_entry->destructor_context = context;
949 fbr_free_in_fiber(
FBR_A_ CURRENT_FIBER, ptr, 1);
954 fbr_free_in_fiber(
FBR_A_ CURRENT_FIBER, ptr, 0);
959 struct fbr_stack_item *ptr = fctx->
__p->sp;
960 (*log)(
FBR_A_ "%s",
"Fiber call stack:");
961 (*log)(FBR_A_
"%s",
"-------------------------------");
962 while (ptr >= fctx->
__p->stack) {
963 (*log)(FBR_A_
"fiber_call: %p\t%s",
966 print_trace_info(FBR_A_ &ptr->tinfo, log);
967 (*log)(FBR_A_
"%s",
"-------------------------------");
972 static void transfer_later(
FBR_P_ struct fiber_id_tailq_i *item)
975 was_empty = TAILQ_EMPTY(&fctx->
__p->pending_fibers);
976 TAILQ_INSERT_TAIL(&fctx->
__p->pending_fibers, item, entries);
977 if (was_empty && !TAILQ_EMPTY(&fctx->
__p->pending_fibers))
978 ev_async_start(fctx->
__p->loop, &fctx->
__p->pending_async);
979 ev_async_send(fctx->
__p->loop, &fctx->
__p->pending_async);
982 static void transfer_later_tailq(
FBR_P_ struct fiber_id_tailq *tailq)
985 was_empty = TAILQ_EMPTY(&fctx->
__p->pending_fibers);
986 TAILQ_CONCAT(&fctx->
__p->pending_fibers, tailq, entries);
987 if (was_empty && !TAILQ_EMPTY(&fctx->
__p->pending_fibers))
988 ev_async_start(fctx->
__p->loop, &fctx->
__p->pending_async);
989 ev_async_send(fctx->
__p->loop, &fctx->
__p->pending_async);
993 struct fbr_mutex *mutex)
1001 struct fbr_mutex *mutex;
1002 mutex = malloc(
sizeof(
struct fbr_mutex));
1003 mutex->locked_by = 0;
1004 TAILQ_INIT(&mutex->pending);
1008 void fbr_mutex_lock(
FBR_P_ struct fbr_mutex *mutex)
1012 fbr_ev_mutex_init(
FBR_A_ &ev, mutex);
1013 fbr_ev_wait_one(
FBR_A_ &ev.ev_base);
1014 assert(mutex->locked_by == CURRENT_FIBER_ID);
1017 int fbr_mutex_trylock(
FBR_P_ struct fbr_mutex *mutex)
1019 if (0 == mutex->locked_by) {
1020 mutex->locked_by = CURRENT_FIBER_ID;
1026 void fbr_mutex_unlock(
FBR_P_ struct fbr_mutex *mutex)
1028 struct fiber_id_tailq_i *item, *x;
1029 struct fbr_fiber *fiber = NULL;
1031 if (TAILQ_EMPTY(&mutex->pending)) {
1032 mutex->locked_by = 0;
1036 TAILQ_FOREACH_SAFE(item, &mutex->pending, entries, x) {
1037 TAILQ_REMOVE(&mutex->pending, item, entries);
1038 if (-1 == fbr_id_unpack(
FBR_A_ &fiber, item->id)) {
1039 assert(FBR_ENOFIBER == fctx->
f_errno);
1046 mutex->locked_by = item->id;
1047 post_ev(
FBR_A_ fiber, item->ev);
1049 transfer_later(
FBR_A_ item);
1054 struct fiber_id_tailq_i *item, *x;
1055 TAILQ_FOREACH_SAFE(item, &mutex->pending, entries, x) {
1062 struct fbr_cond_var *cond,
struct fbr_mutex *mutex)
1071 struct fbr_cond_var *cond;
1072 cond = malloc(
sizeof(
struct fbr_cond_var));
1074 TAILQ_INIT(&cond->waiting);
1080 struct fiber_id_tailq_i *item, *x;
1081 TAILQ_FOREACH_SAFE(item, &cond->waiting, entries, x) {
1087 int fbr_cond_wait(
FBR_P_ struct fbr_cond_var *cond,
struct fbr_mutex *mutex)
1091 if (0 == mutex->locked_by)
1092 return_error(-1, FBR_EINVAL);
1094 fbr_ev_cond_var_init(
FBR_A_ &ev, cond, mutex);
1095 fbr_ev_wait_one(
FBR_A_ &ev.ev_base);
1099 void fbr_cond_broadcast(
FBR_P_ struct fbr_cond_var *cond)
1101 struct fiber_id_tailq_i *item;
1102 struct fbr_fiber *fiber;
1103 if (TAILQ_EMPTY(&cond->waiting))
1105 TAILQ_FOREACH(item, &cond->waiting, entries) {
1106 if(-1 == fbr_id_unpack(
FBR_A_ &fiber, item->id)) {
1107 assert(FBR_ENOFIBER == fctx->
f_errno);
1110 post_ev(
FBR_A_ fiber, item->ev);
1112 transfer_later_tailq(
FBR_A_ &cond->waiting);
1115 void fbr_cond_signal(
FBR_P_ struct fbr_cond_var *cond)
1117 struct fiber_id_tailq_i *item;
1118 struct fbr_fiber *fiber;
1119 if (TAILQ_EMPTY(&cond->waiting))
1121 item = TAILQ_FIRST(&cond->waiting);
1122 if(-1 == fbr_id_unpack(
FBR_A_ &fiber, item->id)) {
1123 assert(FBR_ENOFIBER == fctx->
f_errno);
1126 post_ev(
FBR_A_ fiber, item->ev);
1128 TAILQ_REMOVE(&cond->waiting, item, entries);
1129 transfer_later(
FBR_A_ item);
1132 struct fbr_buffer *fbr_buffer_create(
FBR_P_ size_t size)
1134 struct fbr_buffer *buffer;
1136 buffer = malloc(
sizeof(
struct fbr_buffer));
1138 return_error(NULL, FBR_ESYSTEM);
1140 buffer->vrb = vrb_new(size, NULL);
1141 buffer->prepared_bytes = 0;
1142 buffer->waiting_bytes = 0;
1147 return_success(buffer);
1150 void fbr_buffer_free(
FBR_P_ struct fbr_buffer *buffer)
1152 vrb_destroy(buffer->vrb);
1162 void *fbr_buffer_alloc_prepare(
FBR_P_ struct fbr_buffer *buffer,
size_t size)
1164 if (size > vrb_capacity(buffer->vrb))
1165 return_error(NULL, FBR_EINVAL);
1167 fbr_mutex_lock(
FBR_A_ buffer->write_mutex);
1169 while (buffer->prepared_bytes > 0)
1170 fbr_cond_wait(
FBR_A_ buffer->committed_cond,
1171 buffer->write_mutex);
1173 assert(0 == buffer->prepared_bytes);
1175 buffer->prepared_bytes = size;
1177 while ((
size_t)vrb_space_len(buffer->vrb) < size)
1178 fbr_cond_wait(
FBR_A_ buffer->bytes_freed_cond,
1179 buffer->write_mutex);
1181 return vrb_space_ptr(buffer->vrb);
1184 void fbr_buffer_alloc_commit(
FBR_P_ struct fbr_buffer *buffer)
1186 vrb_give(buffer->vrb, buffer->prepared_bytes);
1187 buffer->prepared_bytes = 0;
1188 fbr_cond_signal(
FBR_A_ buffer->committed_cond);
1189 fbr_mutex_unlock(
FBR_A_ buffer->write_mutex);
1192 void fbr_buffer_alloc_abort(
FBR_P_ struct fbr_buffer *buffer)
1194 buffer->prepared_bytes = 0;
1195 fbr_cond_signal(
FBR_A_ buffer->committed_cond);
1196 fbr_mutex_unlock(
FBR_A_ buffer->write_mutex);
1199 void *fbr_buffer_read_address(
FBR_P_ struct fbr_buffer *buffer,
size_t size)
1202 if (size > vrb_capacity(buffer->vrb))
1203 return_error(NULL, FBR_EINVAL);
1205 fbr_mutex_lock(
FBR_A_ buffer->read_mutex);
1207 while ((
size_t)vrb_data_len(buffer->vrb) < size) {
1208 retval = fbr_cond_wait(
FBR_A_ buffer->committed_cond,
1209 buffer->read_mutex);
1210 assert(0 == retval);
1213 buffer->waiting_bytes = size;
1215 return_success(vrb_data_ptr(buffer->vrb));
1218 void fbr_buffer_read_advance(
FBR_P_ struct fbr_buffer *buffer)
1220 vrb_take(buffer->vrb, buffer->waiting_bytes);
1222 fbr_cond_signal(
FBR_A_ buffer->bytes_freed_cond);
1223 fbr_mutex_unlock(
FBR_A_ buffer->read_mutex);
1226 void fbr_buffer_read_discard(
FBR_P_ struct fbr_buffer *buffer)
1228 fbr_mutex_unlock(
FBR_A_ buffer->read_mutex);
1233 return vrb_data_len(buffer->vrb);
1238 return vrb_space_len(buffer->vrb);
1241 struct fbr_cond_var *fbr_buffer_cond_read(_unused_
FBR_P_ struct fbr_buffer *buffer)
1243 return buffer->committed_cond;
1246 struct fbr_cond_var *fbr_buffer_cond_write(_unused_
FBR_P_ struct fbr_buffer *buffer)
1248 return buffer->bytes_freed_cond;
1253 struct fbr_fiber *fiber;
1254 unpack_transfer_errno(NULL, &fiber,
id);
1255 return_success(fiber->user_data);
1260 struct fbr_fiber *fiber;
1261 unpack_transfer_errno(-1, &fiber,
id);
1262 fiber->user_data = data;