libevfibers
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
fiber.c
1 /********************************************************************
2 
3  Copyright 2012 Konstantin Olkhovskiy <lupus@oxnull.net>
4 
5  This file is part of libevfibers.
6 
7  libevfibers is free software: you can redistribute it and/or modify
8  it under the terms of the GNU Lesser General Public License as
9  published by the Free Software Foundation, either version 3 of the
10  License, or any later version.
11 
12  libevfibers is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License for more details.
16 
17  You should have received a copy of the GNU Lesser General
18  Public License along with libevfibers. If not, see
19  <http://www.gnu.org/licenses/>.
20 
21  ********************************************************************/
22 
23 #include <sys/mman.h>
24 #include <fcntl.h>
25 #include <assert.h>
26 #include <errno.h>
27 #include <utlist.h>
28 #include <stdio.h>
29 #include <string.h>
30 #include <err.h>
31 #include <valgrind/valgrind.h>
32 
33 #include <evfibers_private/fiber.h>
34 
35 #ifndef LIST_FOREACH_SAFE
36 #define LIST_FOREACH_SAFE(var, head, field, next_var) \
37  for ((var) = ((head)->lh_first); \
38  (var) && ((next_var) = ((var)->field.le_next), 1); \
39  (var) = (next_var))
40 
41 #endif
42 
43 #ifndef TAILQ_FOREACH_SAFE
44 #define TAILQ_FOREACH_SAFE(var, head, field, next_var) \
45  for ((var) = ((head)->tqh_first); \
46  (var) ? ({ (next_var) = ((var)->field.tqe_next); 1; }) \
47  : 0; \
48  (var) = (next_var))
49 #endif
50 
51 
52 #define ENSURE_ROOT_FIBER do { \
53  assert(fctx->__p->sp->fiber == &fctx->__p->root); \
54 } while (0)
55 
56 #define CURRENT_FIBER (fctx->__p->sp->fiber)
57 #define CURRENT_FIBER_ID (fbr_id_pack(CURRENT_FIBER))
58 #define CALLED_BY_ROOT ((fctx->__p->sp - 1)->fiber == &fctx->__p->root)
59 
60 #define unpack_transfer_errno(value, ptr, id) \
61  do { \
62  if (-1 == fbr_id_unpack(fctx, ptr, id)) \
63  return (value); \
64  } while (0)
65 
66 #define return_success(value) \
67  do { \
68  fctx->f_errno = FBR_SUCCESS; \
69  return (value); \
70  } while (0)
71 
72 #define return_error(value, code) \
73  do { \
74  fctx->f_errno = (code); \
75  return (value); \
76  } while (0)
77 
78 
79 static fbr_id_t fbr_id_pack(struct fbr_fiber *fiber)
80 {
81  return ((__uint128_t)fiber->id << 64) | (uint64_t)fiber;
82 }
83 
84 static int fbr_id_unpack(FBR_P_ struct fbr_fiber **ptr, fbr_id_t id)
85 {
86  struct fbr_fiber *fiber;
87  uint64_t f_id;
88  f_id = (uint64_t)(id >> 64);
89  fiber = (struct fbr_fiber *)(uint64_t)id;
90  if (fiber->id != f_id)
91  return_error(-1, FBR_ENOFIBER);
92  if (ptr)
93  *ptr = fiber;
94  return 0;
95 }
96 
97 static void pending_async_cb(EV_P_ ev_async *w, _unused_ int revents)
98 {
99  struct fbr_context *fctx;
100  struct fiber_id_tailq_i *item;
101  fctx = (struct fbr_context *)w->data;
102  int retval;
103 
104  ENSURE_ROOT_FIBER;
105 
106  if (TAILQ_EMPTY(&fctx->__p->pending_fibers)) {
107  ev_async_stop(EV_A_ &fctx->__p->pending_async);
108  return;
109  }
110 
111  item = TAILQ_FIRST(&fctx->__p->pending_fibers);
112  TAILQ_REMOVE(&fctx->__p->pending_fibers, item, entries);
113  if (TAILQ_EMPTY(&fctx->__p->pending_fibers))
114  ev_async_stop(EV_A_ &fctx->__p->pending_async);
115  else
116  ev_async_send(EV_A_ &fctx->__p->pending_async);
117 
118  retval = fbr_transfer(FBR_A_ item->id);
119  free(item);
120  if (-1 == retval && FBR_ENOFIBER != fctx->f_errno) {
121  fbr_log_e(FBR_A_ "libevfibers: unexpected error trying to call"
122  " a fiber by id: %s",
123  fbr_strerror(FBR_A_ fctx->f_errno));
124  }
125 }
126 
127 static void *allocate_in_fiber(FBR_P_ size_t size, struct fbr_fiber *in)
128 {
129  struct mem_pool *pool_entry;
130  pool_entry = malloc(size + sizeof(struct mem_pool));
131  if (NULL == pool_entry) {
132  fbr_log_e(FBR_A_ "libevfibers: unable to allocate %zu bytes\n",
133  size + sizeof(struct mem_pool));
134  abort();
135  }
136  pool_entry->ptr = pool_entry;
137  pool_entry->destructor = NULL;
138  pool_entry->destructor_context = NULL;
139  LIST_INSERT_HEAD(&in->pool, pool_entry, entries);
140  return pool_entry + 1;
141 }
142 
143 static void stdio_logger(FBR_P_ struct fbr_logger *logger, enum fbr_log_level level,
144  const char *format, va_list ap)
145 {
146  struct fbr_fiber *fiber;
147  FILE* stream;
148  char *str_level;
149 
150  if (level > logger->level)
151  return;
152 
153  fiber = CURRENT_FIBER;
154 
155  switch (level) {
156  case FBR_LOG_ERROR:
157  str_level = "ERROR";
158  stream = stderr;
159  break;
160  case FBR_LOG_WARNING:
161  str_level = "WARNING";
162  stream = stdout;
163  break;
164  case FBR_LOG_NOTICE:
165  str_level = "NOTICE";
166  stream = stdout;
167  break;
168  case FBR_LOG_INFO:
169  str_level = "INFO";
170  stream = stdout;
171  break;
172  case FBR_LOG_DEBUG:
173  str_level = "DEBUG";
174  stream = stdout;
175  break;
176  default:
177  str_level = "?????";
178  stream = stdout;
179  break;
180  }
181  fprintf(stream, "%-7s %-16s ", str_level, fiber->name);
182  vfprintf(stream, format, ap);
183  fprintf(stream, "\n");
184 }
185 
186 void fbr_init(FBR_P_ struct ev_loop *loop)
187 {
188  struct fbr_fiber *root;
189  struct fbr_logger *logger;
190 
191  fctx->__p = malloc(sizeof(struct fbr_context_private));
192  LIST_INIT(&fctx->__p->reclaimed);
193  LIST_INIT(&fctx->__p->root.children);
194  LIST_INIT(&fctx->__p->root.pool);
195  TAILQ_INIT(&fctx->__p->pending_fibers);
196 
197  root = &fctx->__p->root;
198  root->name = "root";
199  fctx->__p->last_id = 0;
200  root->id = fctx->__p->last_id++;
201  coro_create(&root->ctx, NULL, NULL, NULL, 0);
202 
203  logger = allocate_in_fiber(FBR_A_ sizeof(struct fbr_logger), root);
204  logger->logv = stdio_logger;
205  logger->level = FBR_LOG_NOTICE;
206  fctx->logger = logger;
207 
208  fctx->__p->sp = fctx->__p->stack;
209  fctx->__p->sp->fiber = root;
210  fctx->__p->backtraces_enabled = 1;
211  fill_trace_info(FBR_A_ &fctx->__p->sp->tinfo);
212  fctx->__p->loop = loop;
213  fctx->__p->pending_async.data = fctx;
214  fctx->__p->backtraces_enabled = 0;
215  ev_async_init(&fctx->__p->pending_async, pending_async_cb);
216 }
217 
218 const char *fbr_strerror(_unused_ FBR_P_ enum fbr_error_code code)
219 {
220  switch (code) {
221  case FBR_SUCCESS:
222  return "Success";
223  case FBR_EINVAL:
224  return "Invalid argument";
225  case FBR_ENOFIBER:
226  return "No such fiber";
227  case FBR_ESYSTEM:
228  return "System error, consult system errno";
229  case FBR_EBUFFERMMAP:
230  return "Failed to mmap two adjacent regions";
231  }
232  return "Unknown error";
233 }
234 
235 void fbr_log_e(FBR_P_ const char *format, ...)
236 {
237  va_list ap;
238  va_start(ap, format);
239  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_ERROR, format, ap);
240  va_end(ap);
241 }
242 
243 void fbr_log_w(FBR_P_ const char *format, ...)
244 {
245  va_list ap;
246  va_start(ap, format);
247  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_WARNING, format, ap);
248  va_end(ap);
249 }
250 
251 void fbr_log_n(FBR_P_ const char *format, ...)
252 {
253  va_list ap;
254  va_start(ap, format);
255  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_NOTICE, format, ap);
256  va_end(ap);
257 }
258 
259 void fbr_log_i(FBR_P_ const char *format, ...)
260 {
261  va_list ap;
262  va_start(ap, format);
263  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_INFO, format, ap);
264  va_end(ap);
265 }
266 
267 void fbr_log_d(FBR_P_ const char *format, ...)
268 {
269  va_list ap;
270  va_start(ap, format);
271  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_DEBUG, format, ap);
272  va_end(ap);
273 }
274 
275 static struct fiber_id_tailq_i *id_tailq_i_for(_unused_ FBR_P_
276  struct fbr_fiber *fiber)
277 {
278  struct fiber_id_tailq_i *item;
279  item = malloc(sizeof(struct fiber_id_tailq_i));
280  item->id = fbr_id_pack(fiber);
281  item->ev = NULL;
282  return item;
283 }
284 
285 static void reclaim_children(FBR_P_ struct fbr_fiber *fiber)
286 {
287  struct fbr_fiber *f;
288  LIST_FOREACH(f, &fiber->children, entries.children) {
289  fbr_reclaim(FBR_A_ fbr_id_pack(f));
290  }
291 }
292 
293 void fbr_destroy(FBR_P)
294 {
295  struct fbr_fiber *fiber, *x;
296 
297  reclaim_children(FBR_A_ &fctx->__p->root);
298 
299  LIST_FOREACH_SAFE(fiber, &fctx->__p->reclaimed, entries.reclaimed, x) {
300  if (0 != munmap(fiber->stack, fiber->stack_size))
301  err(EXIT_FAILURE, "munmap");
302  free(fiber);
303  }
304 
305  free(fctx->__p);
306 }
307 
308 void fbr_enable_backtraces(FBR_P_ int enabled)
309 {
310  if (enabled)
311  fctx->__p->backtraces_enabled = 1;
312  else
313  fctx->__p->backtraces_enabled = 0;
314 
315 }
316 
317 static void cancel_ev(_unused_ FBR_P_ struct fbr_ev_base *ev)
318 {
319  struct fbr_ev_watcher *e_watcher;
320  struct fbr_ev_mutex *e_mutex;
321  struct fbr_ev_cond_var *e_cond;
322  struct fiber_id_tailq_i *item;
323  switch (ev->type) {
324  case FBR_EV_WATCHER:
325  e_watcher = fbr_ev_upcast(ev, fbr_ev_watcher);
326  ev_set_cb(e_watcher->w, NULL);
327  break;
328  case FBR_EV_MUTEX:
329  e_mutex = fbr_ev_upcast(ev, fbr_ev_mutex);
330  item = e_mutex->ev_base.data;
331  TAILQ_REMOVE(&e_mutex->mutex->pending, item, entries);
332  free(item);
333  break;
334  case FBR_EV_COND_VAR:
335  e_cond = fbr_ev_upcast(ev, fbr_ev_cond_var);
336  item = e_cond->ev_base.data;
337  TAILQ_REMOVE(&e_cond->cond->waiting, item, entries);
338  free(item);
339  break;
340  }
341 }
342 
343 static void post_ev(FBR_P_ struct fbr_fiber *fiber, struct fbr_ev_base *ev)
344 {
345  struct fbr_ev_base **ev_pptr;
346  struct fbr_ev_base *ev_ptr;
347 
348  assert(NULL == fiber->ev.arrived);
349  assert(NULL != fiber->ev.waiting);
350 
351  fiber->ev.arrived = ev;
352 
353  for (ev_pptr = fiber->ev.waiting; NULL != *ev_pptr; ev_pptr++) {
354  ev_ptr = *ev_pptr;
355  if (ev_ptr != fiber->ev.arrived)
356  cancel_ev(FBR_A_ ev_ptr);
357  }
358 }
359 
360 static void ev_watcher_cb(_unused_ EV_P_ ev_watcher *w, _unused_ int event)
361 {
362  struct fbr_fiber *fiber;
363  struct fbr_ev_watcher *ev = w->data;
364  struct fbr_context *fctx = ev->ev_base.fctx;
365  int retval;
366 
367  ENSURE_ROOT_FIBER;
368 
369  retval = fbr_id_unpack(FBR_A_ &fiber, ev->ev_base.id);
370  if (-1 == retval) {
371  fbr_log_e(FBR_A_ "libevfibers: fiber is about to be called by"
372  " the watcher callback, but it's id is not valid: %s",
373  fbr_strerror(FBR_A_ fctx->f_errno));
374  abort();
375  }
376 
377  post_ev(FBR_A_ fiber, &ev->ev_base);
378 
379  retval = fbr_transfer(FBR_A_ fbr_id_pack(fiber));
380  assert(0 == retval);
381 }
382 
383 
384 static void fbr_free_in_fiber(_unused_ FBR_P_ _unused_ struct fbr_fiber *fiber,
385  void *ptr, int destructor)
386 {
387  struct mem_pool *pool_entry = NULL;
388  if (NULL == ptr)
389  return;
390  pool_entry = (struct mem_pool *)ptr - 1;
391  if (pool_entry->ptr != pool_entry) {
392  fbr_log_e(FBR_A_ "libevfibers: address %p does not look like "
393  "fiber memory pool entry", ptr);
394  if (!RUNNING_ON_VALGRIND)
395  abort();
396  }
397  LIST_REMOVE(pool_entry, entries);
398  if (destructor && pool_entry->destructor)
399  pool_entry->destructor(FBR_A_ ptr, pool_entry->destructor_context);
400  free(pool_entry);
401 }
402 
403 static void fiber_cleanup(FBR_P_ struct fbr_fiber *fiber)
404 {
405  struct mem_pool *p, *x;
406  /* coro_destroy(&fiber->ctx); */
407  LIST_REMOVE(fiber, entries.children);
408  LIST_FOREACH_SAFE(p, &fiber->pool, entries, x) {
409  fbr_free_in_fiber(FBR_A_ fiber, p + 1, 1);
410  }
411 }
412 
413 int fbr_reclaim(FBR_P_ fbr_id_t id)
414 {
415  struct fbr_fiber *fiber;
416 
417  unpack_transfer_errno(-1, &fiber, id);
418 
419  fill_trace_info(FBR_A_ &fiber->reclaim_tinfo);
420  reclaim_children(FBR_A_ fiber);
421  fiber_cleanup(FBR_A_ fiber);
422  fiber->id = fctx->__p->last_id++;
423  LIST_INSERT_HEAD(&fctx->__p->reclaimed, fiber, entries.reclaimed);
424 
425  return_success(0);
426 }
427 
428 int fbr_is_reclaimed(_unused_ FBR_P_ fbr_id_t id)
429 {
430  if (0 == fbr_id_unpack(FBR_A_ NULL, id))
431  return 0;
432  return 1;
433 }
434 
435 fbr_id_t fbr_self(FBR_P)
436 {
437  return CURRENT_FIBER_ID;
438 }
439 
440 static void call_wrapper(FBR_P)
441 {
442  int retval;
443  struct fbr_fiber *fiber = CURRENT_FIBER;
444 
445  fiber->func(FBR_A_ fiber->func_arg);
446 
447  retval = fbr_reclaim(FBR_A_ fbr_id_pack(fiber));
448  assert(0 == retval);
449  fbr_yield(FBR_A);
450  assert(NULL);
451 }
452 
453 enum ev_action_hint {
454  EV_AH_OK = 0,
455  EV_AH_ARRIVED,
456  EV_AH_EINVAL
457 };
458 
459 static enum ev_action_hint prepare_ev(FBR_P_ struct fbr_ev_base *ev)
460 {
461  struct fbr_ev_watcher *e_watcher;
462  struct fbr_ev_mutex *e_mutex;
463  struct fbr_ev_cond_var *e_cond;
464  struct fiber_id_tailq_i *item;
465  switch (ev->type) {
466  case FBR_EV_WATCHER:
467  e_watcher = fbr_ev_upcast(ev, fbr_ev_watcher);
468  if (!ev_is_active(e_watcher->w))
469  return EV_AH_EINVAL;
470  e_watcher->w->data = e_watcher;
471  ev_set_cb(e_watcher->w, ev_watcher_cb);
472  break;
473  case FBR_EV_MUTEX:
474  e_mutex = fbr_ev_upcast(ev, fbr_ev_mutex);
475  if (0 == e_mutex->mutex->locked_by) {
476  e_mutex->mutex->locked_by = CURRENT_FIBER_ID;
477  return EV_AH_ARRIVED;
478  }
479 
480  item = id_tailq_i_for(FBR_A_ CURRENT_FIBER);
481  item->ev = ev;
482  ev->data = item;
483  TAILQ_INSERT_TAIL(&e_mutex->mutex->pending, item, entries);
484  break;
485  case FBR_EV_COND_VAR:
486  e_cond = fbr_ev_upcast(ev, fbr_ev_cond_var);
487  if (0 == e_cond->mutex->locked_by)
488  return_error(-1, FBR_EINVAL);
489  item = id_tailq_i_for(FBR_A_ CURRENT_FIBER);
490  item->ev = ev;
491  ev->data = item;
492  TAILQ_INSERT_TAIL(&e_cond->cond->waiting, item, entries);
493  fbr_mutex_unlock(FBR_A_ e_cond->mutex);
494  break;
495  }
496  return EV_AH_OK;
497 }
498 
499 static void finish_ev(FBR_P_ struct fbr_ev_base *ev)
500 {
501  struct fbr_ev_cond_var *e_cond;
502  struct fbr_ev_watcher *e_watcher;
503  switch (ev->type) {
504  case FBR_EV_COND_VAR:
505  e_cond = fbr_ev_upcast(ev, fbr_ev_cond_var);
506  fbr_mutex_lock(FBR_A_ e_cond->mutex);
507  break;
508  case FBR_EV_WATCHER:
509  e_watcher = fbr_ev_upcast(ev, fbr_ev_watcher);
510  ev_set_cb(e_watcher->w, NULL);
511  break;
512  case FBR_EV_MUTEX:
513  /* NOP */
514  break;
515  }
516 }
517 
518 struct fbr_ev_base *fbr_ev_wait(FBR_P_ struct fbr_ev_base *events[])
519 {
520  struct fbr_fiber *fiber = CURRENT_FIBER;
521  struct fbr_ev_base **ev_pptr;
522  struct fbr_ev_base *ev_ptr;
523  enum ev_action_hint hint;
524 
525  fiber->ev.arrived = NULL;
526  fiber->ev.waiting = events;
527 
528  for (ev_pptr = events; NULL != *ev_pptr; ev_pptr++) {
529  hint = prepare_ev(FBR_A_ *ev_pptr);
530  switch (hint) {
531  case EV_AH_OK:
532  break;
533  case EV_AH_ARRIVED:
534  fiber->ev.arrived = *ev_pptr;
535  while (--ev_pptr >= events)
536  cancel_ev(FBR_A_ *ev_pptr);
537  goto loop_end;
538  case EV_AH_EINVAL:
539  return_error(NULL, FBR_EINVAL);
540  }
541  }
542 
543 loop_end:
544  while (NULL == fiber->ev.arrived)
545  fbr_yield(FBR_A);
546 
547  ev_ptr = fiber->ev.arrived;
548  fiber->ev.arrived = NULL;
549  fiber->ev.waiting = NULL;
550 
551  finish_ev(FBR_A_ ev_ptr);
552  return_success(ev_ptr);
553 }
554 
555 void fbr_ev_wait_one(FBR_P_ struct fbr_ev_base *one)
556 {
557  struct fbr_ev_base *ev_ptr;
558  ev_ptr = fbr_ev_wait(FBR_A_ (struct fbr_ev_base *[]){one, NULL});
559  assert(ev_ptr == one);
560  return;
561 }
562 
563 int fbr_transfer(FBR_P_ fbr_id_t to)
564 {
565  struct fbr_fiber *callee;
566  struct fbr_fiber *caller = fctx->__p->sp->fiber;
567 
568  unpack_transfer_errno(-1, &callee, to);
569 
570  fctx->__p->sp++;
571 
572  fctx->__p->sp->fiber = callee;
573  fill_trace_info(FBR_A_ &fctx->__p->sp->tinfo);
574 
575  coro_transfer(&caller->ctx, &callee->ctx);
576 
577  return_success(0);
578 }
579 
580 void fbr_yield(FBR_P)
581 {
582  struct fbr_fiber *callee = fctx->__p->sp->fiber;
583  struct fbr_fiber *caller = (--fctx->__p->sp)->fiber;
584  coro_transfer(&callee->ctx, &caller->ctx);
585 }
586 
587 int fbr_fd_nonblock(FBR_P_ int fd)
588 {
589  int flags, s;
590 
591  flags = fcntl(fd, F_GETFL, 0);
592  if (flags == -1)
593  return_error(-1, FBR_ESYSTEM);
594 
595  flags |= O_NONBLOCK;
596  s = fcntl(fd, F_SETFL, flags);
597  if (s == -1)
598  return_error(-1, FBR_ESYSTEM);
599 
600  return_success(0);
601 }
602 
603 static void ev_base_init(FBR_P_ struct fbr_ev_base *ev,
604  enum fbr_ev_type type)
605 {
606  ev->type = type;
607  ev->id = CURRENT_FIBER_ID;
608  ev->fctx = fctx;
609 }
610 
611 void fbr_ev_watcher_init(FBR_P_ struct fbr_ev_watcher *ev, ev_watcher *w)
612 {
613  ev_base_init(FBR_A_ &ev->ev_base, FBR_EV_WATCHER);
614  ev->w = w;
615 }
616 
617 ssize_t fbr_read(FBR_P_ int fd, void *buf, size_t count)
618 {
619  ssize_t r;
620  ev_io io;
621  struct fbr_ev_watcher watcher;
622 
623  ev_io_init(&io, NULL, fd, EV_READ);
624  ev_io_start(fctx->__p->loop, &io);
625 
626  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
627  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
628 
629  do {
630  r = read(fd, buf, count);
631  } while (-1 == r && EINTR == errno);
632 
633  ev_io_stop(fctx->__p->loop, &io);
634 
635  return r;
636 }
637 
638 ssize_t fbr_read_all(FBR_P_ int fd, void *buf, size_t count)
639 {
640  ssize_t r;
641  size_t done = 0;
642  ev_io io;
643  struct fbr_ev_watcher watcher;
644 
645  ev_io_init(&io, NULL, fd, EV_READ);
646  ev_io_start(fctx->__p->loop, &io);
647 
648  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
649 
650  while (count != done) {
651 next:
652  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
653  for (;;) {
654  r = read(fd, buf + done, count - done);
655  if (-1 == r) {
656  switch (errno) {
657  case EINTR:
658  continue;
659  case EAGAIN:
660  goto next;
661  default:
662  goto error;
663  }
664  }
665  break;
666  }
667  if (0 == r)
668  break;
669  done += r;
670  }
671  ev_io_stop(fctx->__p->loop, &io);
672  return (ssize_t)done;
673 
674 error:
675  ev_io_stop(fctx->__p->loop, &io);
676  return -1;
677 }
678 
679 ssize_t fbr_readline(FBR_P_ int fd, void *buffer, size_t n)
680 {
681  ssize_t num_read;
682  size_t total_read;
683  char *buf;
684  char ch;
685 
686  if (n <= 0 || buffer == NULL) {
687  errno = EINVAL;
688  return -1;
689  }
690 
691  buf = buffer;
692 
693  total_read = 0;
694  for (;;) {
695  num_read = fbr_read(FBR_A_ fd, &ch, 1);
696 
697  if (num_read == -1) {
698  if (errno == EINTR)
699  continue;
700  else
701  return -1;
702 
703  } else if (num_read == 0) {
704  if (total_read == 0)
705  return 0;
706  else
707  break;
708 
709  } else {
710  if (total_read < n - 1) {
711  total_read++;
712  *buf++ = ch;
713  }
714 
715  if (ch == '\n')
716  break;
717  }
718  }
719 
720  *buf = '\0';
721  return total_read;
722 }
723 
724 ssize_t fbr_write(FBR_P_ int fd, const void *buf, size_t count)
725 {
726  ssize_t r;
727  ev_io io;
728  struct fbr_ev_watcher watcher;
729 
730  ev_io_init(&io, NULL, fd, EV_WRITE);
731  ev_io_start(fctx->__p->loop, &io);
732 
733  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
734  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
735 
736  do {
737  r = write(fd, buf, count);
738  } while (-1 == r && EINTR == errno);
739 
740  ev_io_stop(fctx->__p->loop, &io);
741  return r;
742 }
743 
744 ssize_t fbr_write_all(FBR_P_ int fd, const void *buf, size_t count)
745 {
746  ssize_t r;
747  size_t done = 0;
748  ev_io io;
749  struct fbr_ev_watcher watcher;
750 
751  ev_io_init(&io, NULL, fd, EV_WRITE);
752  ev_io_start(fctx->__p->loop, &io);
753 
754  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
755 
756  while (count != done) {
757 next:
758  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
759  for (;;) {
760  r = write(fd, buf + done, count - done);
761  if (-1 == r) {
762  switch (errno) {
763  case EINTR:
764  continue;
765  case EAGAIN:
766  goto next;
767  default:
768  goto error;
769  }
770  }
771  break;
772  }
773  done += r;
774  }
775  ev_io_stop(fctx->__p->loop, &io);
776  return (ssize_t)done;
777 
778 error:
779  ev_io_stop(fctx->__p->loop, &io);
780  return -1;
781 }
782 
783 ssize_t fbr_recvfrom(FBR_P_ int sockfd, void *buf, size_t len, int flags,
784  struct sockaddr *src_addr, socklen_t *addrlen)
785 {
786  ev_io io;
787  struct fbr_ev_watcher watcher;
788 
789  ev_io_init(&io, NULL, sockfd, EV_READ);
790  ev_io_start(fctx->__p->loop, &io);
791 
792  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
793  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
794 
795  ev_io_stop(fctx->__p->loop, &io);
796 
797  return recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
798 }
799 
800 ssize_t fbr_sendto(FBR_P_ int sockfd, const void *buf, size_t len, int flags, const
801  struct sockaddr *dest_addr, socklen_t addrlen)
802 {
803  ev_io io;
804  struct fbr_ev_watcher watcher;
805 
806  ev_io_init(&io, NULL, sockfd, EV_WRITE);
807  ev_io_start(fctx->__p->loop, &io);
808 
809  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
810  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
811 
812  ev_io_stop(fctx->__p->loop, &io);
813 
814  return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
815 }
816 
817 int fbr_accept(FBR_P_ int sockfd, struct sockaddr *addr, socklen_t *addrlen)
818 {
819  int r;
820  ev_io io;
821  struct fbr_ev_watcher watcher;
822 
823  ev_io_init(&io, NULL, sockfd, EV_READ);
824  ev_io_start(fctx->__p->loop, &io);
825 
826  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
827  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
828 
829  do {
830  r = accept(sockfd, addr, addrlen);
831  } while (-1 == r && EINTR == errno);
832 
833  ev_io_stop(fctx->__p->loop, &io);
834 
835  return r;
836 }
837 
838 ev_tstamp fbr_sleep(FBR_P_ ev_tstamp seconds)
839 {
840  ev_timer timer;
841  struct fbr_ev_watcher watcher;
842  ev_tstamp expected = ev_now(fctx->__p->loop) + seconds;
843 
844  ev_timer_init(&timer, NULL, seconds, 0.);
845  ev_timer_start(fctx->__p->loop, &timer);
846 
847  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&timer);
848  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
849 
850  ev_timer_stop(fctx->__p->loop, &timer);
851 
852  return max(0., expected - ev_now(fctx->__p->loop));
853 }
854 
855 static size_t round_up_to_page_size(size_t size)
856 {
857  static long sz;
858  size_t remainder;
859  if (0 == sz)
860  sz = sysconf(_SC_PAGESIZE);
861  remainder = size % sz;
862  if (remainder == 0)
863  return size;
864  return size + sz - remainder;
865 }
866 
867 fbr_id_t fbr_create(FBR_P_ const char *name, fbr_fiber_func_t func, void *arg,
868  size_t stack_size)
869 {
870  struct fbr_fiber *fiber;
871  if (!LIST_EMPTY(&fctx->__p->reclaimed)) {
872  fiber = LIST_FIRST(&fctx->__p->reclaimed);
873  LIST_REMOVE(fiber, entries.reclaimed);
874  } else {
875  fiber = malloc(sizeof(struct fbr_fiber));
876  memset(fiber, 0x00, sizeof(struct fbr_fiber));
877  if (0 == stack_size)
878  stack_size = FBR_STACK_SIZE;
879  stack_size = round_up_to_page_size(stack_size);
880  fiber->stack = mmap(NULL, stack_size, PROT_READ | PROT_WRITE,
881  MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
882  if (MAP_FAILED == fiber->stack)
883  err(EXIT_FAILURE, "mmap failed");
884  fiber->stack_size = stack_size;
885  (void)VALGRIND_STACK_REGISTER(fiber->stack, fiber->stack +
886  stack_size);
887  fiber->id = fctx->__p->last_id++;
888  }
889  coro_create(&fiber->ctx, (coro_func)call_wrapper, FBR_A, fiber->stack,
891  fiber->call_list = NULL;
892  fiber->call_list_size = 0;
893  LIST_INIT(&fiber->children);
894  LIST_INIT(&fiber->pool);
895  fiber->name = name;
896  fiber->func = func;
897  fiber->func_arg = arg;
898  LIST_INSERT_HEAD(&CURRENT_FIBER->children, fiber, entries.children);
899  fiber->parent = CURRENT_FIBER;
900  return fbr_id_pack(fiber);
901 }
902 
903 int fbr_disown(FBR_P_ fbr_id_t parent_id)
904 {
905  struct fbr_fiber *fiber, *parent;
906  if (parent_id > 0)
907  unpack_transfer_errno(-1, &parent, parent_id);
908  else
909  parent = &fctx->__p->root;
910  fiber = CURRENT_FIBER;
911  LIST_REMOVE(fiber, entries.children);
912  LIST_INSERT_HEAD(&parent->children, fiber, entries.children);
913  fiber->parent = parent;
914  return_success(0);
915 }
916 
917 fbr_id_t fbr_parent(FBR_P)
918 {
919  struct fbr_fiber *fiber = CURRENT_FIBER;
920  if (fiber->parent == &fctx->__p->root)
921  return 0;
922  return fbr_id_pack(fiber->parent);
923 }
924 
925 void *fbr_calloc(FBR_P_ unsigned int nmemb, size_t size)
926 {
927  void *ptr;
928  ptr = allocate_in_fiber(FBR_A_ nmemb * size, CURRENT_FIBER);
929  memset(ptr, 0x00, nmemb * size);
930  return ptr;
931 }
932 
933 void *fbr_alloc(FBR_P_ size_t size)
934 {
935  return allocate_in_fiber(FBR_A_ size, CURRENT_FIBER);
936 }
937 
938 void fbr_alloc_set_destructor(_unused_ FBR_P_ void *ptr,
939  fbr_alloc_destructor_func_t func, void *context)
940 {
941  struct mem_pool *pool_entry;
942  pool_entry = (struct mem_pool *)ptr - 1;
943  pool_entry->destructor = func;
944  pool_entry->destructor_context = context;
945 }
946 
947 void fbr_free(FBR_P_ void *ptr)
948 {
949  fbr_free_in_fiber(FBR_A_ CURRENT_FIBER, ptr, 1);
950 }
951 
952 void fbr_free_nd(FBR_P_ void *ptr)
953 {
954  fbr_free_in_fiber(FBR_A_ CURRENT_FIBER, ptr, 0);
955 }
956 
957 void fbr_dump_stack(FBR_P_ fbr_logutil_func_t log)
958 {
959  struct fbr_stack_item *ptr = fctx->__p->sp;
960  (*log)(FBR_A_ "%s", "Fiber call stack:");
961  (*log)(FBR_A_ "%s", "-------------------------------");
962  while (ptr >= fctx->__p->stack) {
963  (*log)(FBR_A_ "fiber_call: %p\t%s",
964  ptr->fiber,
965  ptr->fiber->name);
966  print_trace_info(FBR_A_ &ptr->tinfo, log);
967  (*log)(FBR_A_ "%s", "-------------------------------");
968  ptr--;
969  }
970 }
971 
972 static void transfer_later(FBR_P_ struct fiber_id_tailq_i *item)
973 {
974  int was_empty;
975  was_empty = TAILQ_EMPTY(&fctx->__p->pending_fibers);
976  TAILQ_INSERT_TAIL(&fctx->__p->pending_fibers, item, entries);
977  if (was_empty && !TAILQ_EMPTY(&fctx->__p->pending_fibers))
978  ev_async_start(fctx->__p->loop, &fctx->__p->pending_async);
979  ev_async_send(fctx->__p->loop, &fctx->__p->pending_async);
980 }
981 
982 static void transfer_later_tailq(FBR_P_ struct fiber_id_tailq *tailq)
983 {
984  int was_empty;
985  was_empty = TAILQ_EMPTY(&fctx->__p->pending_fibers);
986  TAILQ_CONCAT(&fctx->__p->pending_fibers, tailq, entries);
987  if (was_empty && !TAILQ_EMPTY(&fctx->__p->pending_fibers))
988  ev_async_start(fctx->__p->loop, &fctx->__p->pending_async);
989  ev_async_send(fctx->__p->loop, &fctx->__p->pending_async);
990 }
991 
992 void fbr_ev_mutex_init(FBR_P_ struct fbr_ev_mutex *ev,
993  struct fbr_mutex *mutex)
994 {
995  ev_base_init(FBR_A_ &ev->ev_base, FBR_EV_MUTEX);
996  ev->mutex = mutex;
997 }
998 
999 struct fbr_mutex *fbr_mutex_create(_unused_ FBR_P)
1000 {
1001  struct fbr_mutex *mutex;
1002  mutex = malloc(sizeof(struct fbr_mutex));
1003  mutex->locked_by = 0;
1004  TAILQ_INIT(&mutex->pending);
1005  return mutex;
1006 }
1007 
1008 void fbr_mutex_lock(FBR_P_ struct fbr_mutex *mutex)
1009 {
1010  struct fbr_ev_mutex ev;
1011 
1012  fbr_ev_mutex_init(FBR_A_ &ev, mutex);
1013  fbr_ev_wait_one(FBR_A_ &ev.ev_base);
1014  assert(mutex->locked_by == CURRENT_FIBER_ID);
1015 }
1016 
1017 int fbr_mutex_trylock(FBR_P_ struct fbr_mutex *mutex)
1018 {
1019  if (0 == mutex->locked_by) {
1020  mutex->locked_by = CURRENT_FIBER_ID;
1021  return 1;
1022  }
1023  return 0;
1024 }
1025 
1026 void fbr_mutex_unlock(FBR_P_ struct fbr_mutex *mutex)
1027 {
1028  struct fiber_id_tailq_i *item, *x;
1029  struct fbr_fiber *fiber = NULL;
1030 
1031  if (TAILQ_EMPTY(&mutex->pending)) {
1032  mutex->locked_by = 0;
1033  return;
1034  }
1035 
1036  TAILQ_FOREACH_SAFE(item, &mutex->pending, entries, x) {
1037  TAILQ_REMOVE(&mutex->pending, item, entries);
1038  if (-1 == fbr_id_unpack(FBR_A_ &fiber, item->id)) {
1039  assert(FBR_ENOFIBER == fctx->f_errno);
1040  free(item);
1041  continue;
1042  }
1043  break;
1044  }
1045 
1046  mutex->locked_by = item->id;
1047  post_ev(FBR_A_ fiber, item->ev);
1048 
1049  transfer_later(FBR_A_ item);
1050 }
1051 
1052 void fbr_mutex_destroy(_unused_ FBR_P_ struct fbr_mutex *mutex)
1053 {
1054  struct fiber_id_tailq_i *item, *x;
1055  TAILQ_FOREACH_SAFE(item, &mutex->pending, entries, x) {
1056  free(item);
1057  }
1058  free(mutex);
1059 }
1060 
1061 void fbr_ev_cond_var_init(FBR_P_ struct fbr_ev_cond_var *ev,
1062  struct fbr_cond_var *cond, struct fbr_mutex *mutex)
1063 {
1064  ev_base_init(FBR_A_ &ev->ev_base, FBR_EV_COND_VAR);
1065  ev->cond = cond;
1066  ev->mutex = mutex;
1067 }
1068 
1069 struct fbr_cond_var *fbr_cond_create(_unused_ FBR_P)
1070 {
1071  struct fbr_cond_var *cond;
1072  cond = malloc(sizeof(struct fbr_cond_var));
1073  cond->mutex = NULL;
1074  TAILQ_INIT(&cond->waiting);
1075  return cond;
1076 }
1077 
1078 void fbr_cond_destroy(_unused_ FBR_P_ struct fbr_cond_var *cond)
1079 {
1080  struct fiber_id_tailq_i *item, *x;
1081  TAILQ_FOREACH_SAFE(item, &cond->waiting, entries, x) {
1082  free(item);
1083  }
1084  free(cond);
1085 }
1086 
1087 int fbr_cond_wait(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex)
1088 {
1089  struct fbr_ev_cond_var ev;
1090 
1091  if (0 == mutex->locked_by)
1092  return_error(-1, FBR_EINVAL);
1093 
1094  fbr_ev_cond_var_init(FBR_A_ &ev, cond, mutex);
1095  fbr_ev_wait_one(FBR_A_ &ev.ev_base);
1096  return_success(0);
1097 }
1098 
1099 void fbr_cond_broadcast(FBR_P_ struct fbr_cond_var *cond)
1100 {
1101  struct fiber_id_tailq_i *item;
1102  struct fbr_fiber *fiber;
1103  if (TAILQ_EMPTY(&cond->waiting))
1104  return;
1105  TAILQ_FOREACH(item, &cond->waiting, entries) {
1106  if(-1 == fbr_id_unpack(FBR_A_ &fiber, item->id)) {
1107  assert(FBR_ENOFIBER == fctx->f_errno);
1108  continue;
1109  }
1110  post_ev(FBR_A_ fiber, item->ev);
1111  }
1112  transfer_later_tailq(FBR_A_ &cond->waiting);
1113 }
1114 
1115 void fbr_cond_signal(FBR_P_ struct fbr_cond_var *cond)
1116 {
1117  struct fiber_id_tailq_i *item;
1118  struct fbr_fiber *fiber;
1119  if (TAILQ_EMPTY(&cond->waiting))
1120  return;
1121  item = TAILQ_FIRST(&cond->waiting);
1122  if(-1 == fbr_id_unpack(FBR_A_ &fiber, item->id)) {
1123  assert(FBR_ENOFIBER == fctx->f_errno);
1124  return;
1125  }
1126  post_ev(FBR_A_ fiber, item->ev);
1127 
1128  TAILQ_REMOVE(&cond->waiting, item, entries);
1129  transfer_later(FBR_A_ item);
1130 }
1131 
1132 struct fbr_buffer *fbr_buffer_create(FBR_P_ size_t size)
1133 {
1134  struct fbr_buffer *buffer;
1135 
1136  buffer = malloc(sizeof(struct fbr_buffer));
1137  if(NULL == buffer)
1138  return_error(NULL, FBR_ESYSTEM);
1139 
1140  buffer->vrb = vrb_new(size, NULL);
1141  buffer->prepared_bytes = 0;
1142  buffer->waiting_bytes = 0;
1143  buffer->committed_cond = fbr_cond_create(FBR_A);
1144  buffer->bytes_freed_cond = fbr_cond_create(FBR_A);
1145  buffer->write_mutex = fbr_mutex_create(FBR_A);
1146  buffer->read_mutex = fbr_mutex_create(FBR_A);
1147  return_success(buffer);
1148 }
1149 
1150 void fbr_buffer_free(FBR_P_ struct fbr_buffer *buffer)
1151 {
1152  vrb_destroy(buffer->vrb);
1153 
1154  fbr_mutex_destroy(FBR_A_ buffer->read_mutex);
1155  fbr_mutex_destroy(FBR_A_ buffer->write_mutex);
1156  fbr_cond_destroy(FBR_A_ buffer->committed_cond);
1157  fbr_cond_destroy(FBR_A_ buffer->bytes_freed_cond);
1158 
1159  free(buffer);
1160 }
1161 
1162 void *fbr_buffer_alloc_prepare(FBR_P_ struct fbr_buffer *buffer, size_t size)
1163 {
1164  if (size > vrb_capacity(buffer->vrb))
1165  return_error(NULL, FBR_EINVAL);
1166 
1167  fbr_mutex_lock(FBR_A_ buffer->write_mutex);
1168 
1169  while (buffer->prepared_bytes > 0)
1170  fbr_cond_wait(FBR_A_ buffer->committed_cond,
1171  buffer->write_mutex);
1172 
1173  assert(0 == buffer->prepared_bytes);
1174 
1175  buffer->prepared_bytes = size;
1176 
1177  while ((size_t)vrb_space_len(buffer->vrb) < size)
1178  fbr_cond_wait(FBR_A_ buffer->bytes_freed_cond,
1179  buffer->write_mutex);
1180 
1181  return vrb_space_ptr(buffer->vrb);
1182 }
1183 
1184 void fbr_buffer_alloc_commit(FBR_P_ struct fbr_buffer *buffer)
1185 {
1186  vrb_give(buffer->vrb, buffer->prepared_bytes);
1187  buffer->prepared_bytes = 0;
1188  fbr_cond_signal(FBR_A_ buffer->committed_cond);
1189  fbr_mutex_unlock(FBR_A_ buffer->write_mutex);
1190 }
1191 
1192 void fbr_buffer_alloc_abort(FBR_P_ struct fbr_buffer *buffer)
1193 {
1194  buffer->prepared_bytes = 0;
1195  fbr_cond_signal(FBR_A_ buffer->committed_cond);
1196  fbr_mutex_unlock(FBR_A_ buffer->write_mutex);
1197 }
1198 
1199 void *fbr_buffer_read_address(FBR_P_ struct fbr_buffer *buffer, size_t size)
1200 {
1201  int retval;
1202  if (size > vrb_capacity(buffer->vrb))
1203  return_error(NULL, FBR_EINVAL);
1204 
1205  fbr_mutex_lock(FBR_A_ buffer->read_mutex);
1206 
1207  while ((size_t)vrb_data_len(buffer->vrb) < size) {
1208  retval = fbr_cond_wait(FBR_A_ buffer->committed_cond,
1209  buffer->read_mutex);
1210  assert(0 == retval);
1211  }
1212 
1213  buffer->waiting_bytes = size;
1214 
1215  return_success(vrb_data_ptr(buffer->vrb));
1216 }
1217 
1218 void fbr_buffer_read_advance(FBR_P_ struct fbr_buffer *buffer)
1219 {
1220  vrb_take(buffer->vrb, buffer->waiting_bytes);
1221 
1222  fbr_cond_signal(FBR_A_ buffer->bytes_freed_cond);
1223  fbr_mutex_unlock(FBR_A_ buffer->read_mutex);
1224 }
1225 
1226 void fbr_buffer_read_discard(FBR_P_ struct fbr_buffer *buffer)
1227 {
1228  fbr_mutex_unlock(FBR_A_ buffer->read_mutex);
1229 }
1230 
1231 size_t fbr_buffer_bytes(_unused_ FBR_P_ struct fbr_buffer *buffer)
1232 {
1233  return vrb_data_len(buffer->vrb);
1234 }
1235 
1236 size_t fbr_buffer_free_bytes(_unused_ FBR_P_ struct fbr_buffer *buffer)
1237 {
1238  return vrb_space_len(buffer->vrb);
1239 }
1240 
1241 struct fbr_cond_var *fbr_buffer_cond_read(_unused_ FBR_P_ struct fbr_buffer *buffer)
1242 {
1243  return buffer->committed_cond;
1244 }
1245 
1246 struct fbr_cond_var *fbr_buffer_cond_write(_unused_ FBR_P_ struct fbr_buffer *buffer)
1247 {
1248  return buffer->bytes_freed_cond;
1249 }
1250 
1251 void *fbr_get_user_data(FBR_P_ fbr_id_t id)
1252 {
1253  struct fbr_fiber *fiber;
1254  unpack_transfer_errno(NULL, &fiber, id);
1255  return_success(fiber->user_data);
1256 }
1257 
1258 int fbr_set_user_data(FBR_P_ fbr_id_t id, void *data)
1259 {
1260  struct fbr_fiber *fiber;
1261  unpack_transfer_errno(-1, &fiber, id);
1262  fiber->user_data = data;
1263  return_success(0);
1264 }