libevfibers
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
fiber.c
1 /********************************************************************
2 
3  Copyright 2013 Konstantin Olkhovskiy <lupus@oxnull.net>
4 
5  Licensed under the Apache License, Version 2.0 (the "License");
6  you may not use this file except in compliance with the License.
7  You may obtain a copy of the License at
8 
9  http://www.apache.org/licenses/LICENSE-2.0
10 
11  Unless required by applicable law or agreed to in writing, software
12  distributed under the License is distributed on an "AS IS" BASIS,
13  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  See the License for the specific language governing permissions and
15  limitations under the License.
16 
17  ********************************************************************/
18 
19 #include <evfibers/config.h>
20 
21 #include <sys/mman.h>
22 #include <fcntl.h>
23 #include <linux/limits.h>
24 #include <libgen.h>
25 #include <assert.h>
26 #include <errno.h>
27 #include <stdio.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <err.h>
31 #ifdef HAVE_VALGRIND_H
32 #include <valgrind/valgrind.h>
33 #else
34 #define RUNNING_ON_VALGRIND (0)
35 #define VALGRIND_STACK_REGISTER(a,b) (void)0
36 #endif
37 
38 #ifdef FBR_EIO_ENABLED
39 #include <evfibers/eio.h>
40 #endif
41 #include <evfibers_private/fiber.h>
42 
43 #ifndef LIST_FOREACH_SAFE
44 #define LIST_FOREACH_SAFE(var, head, field, next_var) \
45  for ((var) = ((head)->lh_first); \
46  (var) && ((next_var) = ((var)->field.le_next), 1); \
47  (var) = (next_var))
48 
49 #endif
50 
51 #ifndef TAILQ_FOREACH_SAFE
52 #define TAILQ_FOREACH_SAFE(var, head, field, next_var) \
53  for ((var) = ((head)->tqh_first); \
54  (var) ? ({ (next_var) = ((var)->field.tqe_next); 1; }) \
55  : 0; \
56  (var) = (next_var))
57 #endif
58 
59 
60 #define ENSURE_ROOT_FIBER do { \
61  assert(fctx->__p->sp->fiber == &fctx->__p->root); \
62 } while (0)
63 
64 #define CURRENT_FIBER (fctx->__p->sp->fiber)
65 #define CURRENT_FIBER_ID (fbr_id_pack(CURRENT_FIBER))
66 #define CALLED_BY_ROOT ((fctx->__p->sp - 1)->fiber == &fctx->__p->root)
67 
68 #define unpack_transfer_errno(value, ptr, id) \
69  do { \
70  if (-1 == fbr_id_unpack(fctx, ptr, id)) \
71  return (value); \
72  } while (0)
73 
74 #define return_success(value) \
75  do { \
76  fctx->f_errno = FBR_SUCCESS; \
77  return (value); \
78  } while (0)
79 
80 #define return_error(value, code) \
81  do { \
82  fctx->f_errno = (code); \
83  return (value); \
84  } while (0)
85 
86 
87 const fbr_id_t FBR_ID_NULL = {0, NULL};
88 static const char default_buffer_pattern[] = "/dev/shm/fbr_buffer.XXXXXXXXX";
89 
90 static fbr_id_t fbr_id_pack(struct fbr_fiber *fiber)
91 {
92  return (struct fbr_id_s){.g = fiber->id, .p = fiber};
93 }
94 
95 static int fbr_id_unpack(FBR_P_ struct fbr_fiber **ptr, fbr_id_t id)
96 {
97  struct fbr_fiber *fiber = id.p;
98  if (fiber->id != id.g)
99  return_error(-1, FBR_ENOFIBER);
100  if (ptr)
101  *ptr = id.p;
102  return 0;
103 }
104 
105 static void pending_async_cb(EV_P_ ev_async *w, _unused_ int revents)
106 {
107  struct fbr_context *fctx;
108  struct fbr_id_tailq_i *item;
109  fctx = (struct fbr_context *)w->data;
110  int retval;
111 
112  ENSURE_ROOT_FIBER;
113 
114  if (TAILQ_EMPTY(&fctx->__p->pending_fibers)) {
115  ev_async_stop(EV_A_ &fctx->__p->pending_async);
116  return;
117  }
118 
119  item = TAILQ_FIRST(&fctx->__p->pending_fibers);
120  assert(item->head == &fctx->__p->pending_fibers);
121  /* item shall be removed from the queue by a destructor, which shall be
122  * set by the procedure demanding delayed execution. Destructor
123  * guarantees removal upon the reclaim of fiber. */
124  ev_async_send(EV_A_ &fctx->__p->pending_async);
125 
126  retval = fbr_transfer(FBR_A_ item->id);
127  if (-1 == retval && FBR_ENOFIBER != fctx->f_errno) {
128  fbr_log_e(FBR_A_ "libevfibers: unexpected error trying to call"
129  " a fiber by id: %s",
130  fbr_strerror(FBR_A_ fctx->f_errno));
131  }
132 }
133 
134 static void *allocate_in_fiber(FBR_P_ size_t size, struct fbr_fiber *in)
135 {
136  struct mem_pool *pool_entry;
137  pool_entry = malloc(size + sizeof(struct mem_pool));
138  if (NULL == pool_entry) {
139  fbr_log_e(FBR_A_ "libevfibers: unable to allocate %zu bytes\n",
140  size + sizeof(struct mem_pool));
141  abort();
142  }
143  pool_entry->ptr = pool_entry;
144  pool_entry->destructor = NULL;
145  pool_entry->destructor_context = NULL;
146  LIST_INSERT_HEAD(&in->pool, pool_entry, entries);
147  return pool_entry + 1;
148 }
149 
150 static void stdio_logger(FBR_P_ struct fbr_logger *logger,
151  enum fbr_log_level level, const char *format, va_list ap)
152 {
153  struct fbr_fiber *fiber;
154  FILE* stream;
155  char *str_level;
156  ev_tstamp tstamp;
157 
158  if (level > logger->level)
159  return;
160 
161  fiber = CURRENT_FIBER;
162 
163  switch (level) {
164  case FBR_LOG_ERROR:
165  str_level = "ERROR";
166  stream = stderr;
167  break;
168  case FBR_LOG_WARNING:
169  str_level = "WARNING";
170  stream = stdout;
171  break;
172  case FBR_LOG_NOTICE:
173  str_level = "NOTICE";
174  stream = stdout;
175  break;
176  case FBR_LOG_INFO:
177  str_level = "INFO";
178  stream = stdout;
179  break;
180  case FBR_LOG_DEBUG:
181  str_level = "DEBUG";
182  stream = stdout;
183  break;
184  default:
185  str_level = "?????";
186  stream = stdout;
187  break;
188  }
189  tstamp = ev_now(fctx->__p->loop);
190  fprintf(stream, "%.6f %-7s %-16s ", tstamp, str_level, fiber->name);
191  vfprintf(stream, format, ap);
192  fprintf(stream, "\n");
193 }
194 
195 void fbr_init(FBR_P_ struct ev_loop *loop)
196 {
197  struct fbr_fiber *root;
198  struct fbr_logger *logger;
199  char *buffer_pattern;
200 
201  fctx->__p = malloc(sizeof(struct fbr_context_private));
202  LIST_INIT(&fctx->__p->reclaimed);
203  LIST_INIT(&fctx->__p->root.children);
204  LIST_INIT(&fctx->__p->root.pool);
205  TAILQ_INIT(&fctx->__p->root.destructors);
206  TAILQ_INIT(&fctx->__p->pending_fibers);
207 
208  root = &fctx->__p->root;
209  strncpy(root->name, "root", FBR_MAX_FIBER_NAME);
210  fctx->__p->last_id = 0;
211  root->id = fctx->__p->last_id++;
212  coro_create(&root->ctx, NULL, NULL, NULL, 0);
213 
214  logger = allocate_in_fiber(FBR_A_ sizeof(struct fbr_logger), root);
215  logger->logv = stdio_logger;
216  logger->level = FBR_LOG_NOTICE;
217  fctx->logger = logger;
218 
219  fctx->__p->sp = fctx->__p->stack;
220  fctx->__p->sp->fiber = root;
221  fctx->__p->backtraces_enabled = 1;
222  fill_trace_info(FBR_A_ &fctx->__p->sp->tinfo);
223  fctx->__p->loop = loop;
224  fctx->__p->pending_async.data = fctx;
225  fctx->__p->backtraces_enabled = 0;
226  memset(&fctx->__p->key_free_mask, 0x00,
227  sizeof(fctx->__p->key_free_mask));
228  ev_async_init(&fctx->__p->pending_async, pending_async_cb);
229 
230  buffer_pattern = getenv("FBR_BUFFER_FILE_PATTERN");
231  if (buffer_pattern)
232  fctx->__p->buffer_file_pattern = buffer_pattern;
233  else
234  fctx->__p->buffer_file_pattern = default_buffer_pattern;
235 }
236 
237 const char *fbr_strerror(_unused_ FBR_P_ enum fbr_error_code code)
238 {
239  switch (code) {
240  case FBR_SUCCESS:
241  return "Success";
242  case FBR_EINVAL:
243  return "Invalid argument";
244  case FBR_ENOFIBER:
245  return "No such fiber";
246  case FBR_ESYSTEM:
247  return "System error, consult system errno";
248  case FBR_EBUFFERMMAP:
249  return "Failed to mmap two adjacent regions";
250  case FBR_ENOKEY:
251  return "Fiber-local key does not exist";
252  case FBR_EPROTOBUF:
253  return "Protobuf unpacking error";
254  case FBR_EBUFFERNOSPACE:
255  return "Not enough space in the buffer";
256  case FBR_EEIO:
257  return "libeio request error";
258  }
259  return "Unknown error";
260 }
261 
262 void fbr_log_e(FBR_P_ const char *format, ...)
263 {
264  va_list ap;
265  va_start(ap, format);
266  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_ERROR, format, ap);
267  va_end(ap);
268 }
269 
270 void fbr_log_w(FBR_P_ const char *format, ...)
271 {
272  va_list ap;
273  va_start(ap, format);
274  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_WARNING, format, ap);
275  va_end(ap);
276 }
277 
278 void fbr_log_n(FBR_P_ const char *format, ...)
279 {
280  va_list ap;
281  va_start(ap, format);
282  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_NOTICE, format, ap);
283  va_end(ap);
284 }
285 
286 void fbr_log_i(FBR_P_ const char *format, ...)
287 {
288  va_list ap;
289  va_start(ap, format);
290  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_INFO, format, ap);
291  va_end(ap);
292 }
293 
294 void fbr_log_d(FBR_P_ const char *format, ...)
295 {
296  va_list ap;
297  va_start(ap, format);
298  (*fctx->logger->logv)(FBR_A_ fctx->logger, FBR_LOG_DEBUG, format, ap);
299  va_end(ap);
300 }
301 
302 void id_tailq_i_set(_unused_ FBR_P_
303  struct fbr_id_tailq_i *item,
304  struct fbr_fiber *fiber)
305 {
306  item->id = fbr_id_pack(fiber);
307  item->ev = NULL;
308 }
309 
310 static void reclaim_children(FBR_P_ struct fbr_fiber *fiber)
311 {
312  struct fbr_fiber *f;
313  LIST_FOREACH(f, &fiber->children, entries.children) {
314  fbr_reclaim(FBR_A_ fbr_id_pack(f));
315  }
316 }
317 
318 static void fbr_free_in_fiber(_unused_ FBR_P_ _unused_ struct fbr_fiber *fiber,
319  void *ptr, int destructor);
320 
321 void fbr_destroy(FBR_P)
322 {
323  struct fbr_fiber *fiber, *x;
324  struct mem_pool *p, *x2;
325 
326  reclaim_children(FBR_A_ &fctx->__p->root);
327 
328  LIST_FOREACH_SAFE(p, &fctx->__p->root.pool, entries, x2) {
329  fbr_free_in_fiber(FBR_A_ &fctx->__p->root, p + 1, 1);
330  }
331 
332  LIST_FOREACH_SAFE(fiber, &fctx->__p->reclaimed, entries.reclaimed, x) {
333  if (0 != munmap(fiber->stack, fiber->stack_size))
334  err(EXIT_FAILURE, "munmap");
335  free(fiber);
336  }
337 
338  free(fctx->__p);
339 }
340 
341 void fbr_enable_backtraces(FBR_P_ int enabled)
342 {
343  if (enabled)
344  fctx->__p->backtraces_enabled = 1;
345  else
346  fctx->__p->backtraces_enabled = 0;
347 
348 }
349 
350 static void cancel_ev(_unused_ FBR_P_ struct fbr_ev_base *ev)
351 {
352  fbr_destructor_remove(FBR_A_ &ev->item.dtor, 1 /* call it */);
353 }
354 
355 static void post_ev(_unused_ FBR_P_ struct fbr_fiber *fiber,
356  struct fbr_ev_base *ev)
357 {
358  assert(NULL != fiber->ev.waiting);
359 
360  fiber->ev.arrived = 1;
361  ev->arrived = 1;
362 }
363 
364 static void ev_watcher_cb(_unused_ EV_P_ ev_watcher *w, _unused_ int event)
365 {
366  struct fbr_fiber *fiber;
367  struct fbr_ev_watcher *ev = w->data;
368  struct fbr_context *fctx = ev->ev_base.fctx;
369  int retval;
370 
371  ENSURE_ROOT_FIBER;
372 
373  retval = fbr_id_unpack(FBR_A_ &fiber, ev->ev_base.id);
374  if (-1 == retval) {
375  fbr_log_e(FBR_A_ "libevfibers: fiber is about to be called by"
376  " the watcher callback, but it's id is not valid: %s",
377  fbr_strerror(FBR_A_ fctx->f_errno));
378  abort();
379  }
380 
381  post_ev(FBR_A_ fiber, &ev->ev_base);
382 
383  retval = fbr_transfer(FBR_A_ fbr_id_pack(fiber));
384  assert(0 == retval);
385 }
386 
387 
388 static void fbr_free_in_fiber(_unused_ FBR_P_ _unused_ struct fbr_fiber *fiber,
389  void *ptr, int destructor)
390 {
391  struct mem_pool *pool_entry = NULL;
392  if (NULL == ptr)
393  return;
394  pool_entry = (struct mem_pool *)ptr - 1;
395  if (pool_entry->ptr != pool_entry) {
396  fbr_log_e(FBR_A_ "libevfibers: address %p does not look like "
397  "fiber memory pool entry", ptr);
398  if (!RUNNING_ON_VALGRIND)
399  abort();
400  }
401  LIST_REMOVE(pool_entry, entries);
402  if (destructor && pool_entry->destructor)
403  pool_entry->destructor(FBR_A_ ptr, pool_entry->destructor_context);
404  free(pool_entry);
405 }
406 
407 static void fiber_cleanup(FBR_P_ struct fbr_fiber *fiber)
408 {
409  struct mem_pool *p, *x;
410  struct fbr_destructor *dtor;
411  /* coro_destroy(&fiber->ctx); */
412  LIST_REMOVE(fiber, entries.children);
413  TAILQ_FOREACH(dtor, &fiber->destructors, entries) {
414  dtor->func(FBR_A_ dtor->arg);
415  }
416  LIST_FOREACH_SAFE(p, &fiber->pool, entries, x) {
417  fbr_free_in_fiber(FBR_A_ fiber, p + 1, 1);
418  }
419 }
420 
421 static void filter_fiber_stack(FBR_P_ struct fbr_fiber *fiber)
422 {
423  struct fbr_stack_item *sp;
424  for (sp = fctx->__p->stack; sp < fctx->__p->sp; sp++) {
425  if (sp->fiber == fiber) {
426  memmove(sp, sp + 1, (fctx->__p->sp - sp) * sizeof(*sp));
427  fctx->__p->sp--;
428  }
429  }
430 }
431 
432 int fbr_reclaim(FBR_P_ fbr_id_t id)
433 {
434  struct fbr_fiber *fiber;
435  struct fbr_mutex mutex;
436  int retval;
437 #if 0
438  struct fbr_fiber *f;
439 #endif
440 
441  unpack_transfer_errno(-1, &fiber, id);
442 
443  fbr_mutex_init(FBR_A_ &mutex);
444  fbr_mutex_lock(FBR_A_ &mutex);
445  while (fiber->no_reclaim) {
446  fiber->want_reclaim = 1;
447  assert("Attempt to reclaim self while no_reclaim is set would"
448  " block forever" && fiber != CURRENT_FIBER);
449  if (-1 == fbr_id_unpack(FBR_A_ NULL, id) &&
450  FBR_ENOFIBER == fctx->f_errno)
451  return_success(0);
452  retval = fbr_cond_wait(FBR_A_ &fiber->reclaim_cond, &mutex);
453  assert(0 == retval);
454  (void)retval;
455  }
456  fbr_mutex_unlock(FBR_A_ &mutex);
457  fbr_mutex_destroy(FBR_A_ &mutex);
458 
459  if (-1 == fbr_id_unpack(FBR_A_ NULL, id) &&
460  FBR_ENOFIBER == fctx->f_errno)
461  return_success(0);
462 
463  fill_trace_info(FBR_A_ &fiber->reclaim_tinfo);
464  reclaim_children(FBR_A_ fiber);
465  fiber_cleanup(FBR_A_ fiber);
466  fiber->id = fctx->__p->last_id++;
467 #if 0
468  LIST_FOREACH(f, &fctx->__p->reclaimed, entries.reclaimed) {
469  assert(f != fiber);
470  }
471 #endif
472  LIST_INSERT_HEAD(&fctx->__p->reclaimed, fiber, entries.reclaimed);
473 
474  filter_fiber_stack(FBR_A_ fiber);
475 
476  if (CURRENT_FIBER == fiber)
477  fbr_yield(FBR_A);
478 
479  return_success(0);
480 }
481 
482 int fbr_set_reclaim(FBR_P_ fbr_id_t id)
483 {
484  struct fbr_fiber *fiber;
485 
486  unpack_transfer_errno(-1, &fiber, id);
487  fiber->no_reclaim = 0;
488  fbr_cond_broadcast(FBR_A_ &fiber->reclaim_cond);
489  return_success(0);
490 }
491 
492 int fbr_set_noreclaim(FBR_P_ fbr_id_t id)
493 {
494  struct fbr_fiber *fiber;
495 
496  unpack_transfer_errno(-1, &fiber, id);
497  fiber->no_reclaim = 1;
498  return_success(0);
499 }
500 
501 int fbr_want_reclaim(FBR_P_ fbr_id_t id)
502 {
503  struct fbr_fiber *fiber;
504 
505  unpack_transfer_errno(-1, &fiber, id);
506  return_success(fiber->want_reclaim);
507 }
508 
509 int fbr_is_reclaimed(_unused_ FBR_P_ fbr_id_t id)
510 {
511  if (0 == fbr_id_unpack(FBR_A_ NULL, id))
512  return 0;
513  return 1;
514 }
515 
516 fbr_id_t fbr_self(FBR_P)
517 {
518  return CURRENT_FIBER_ID;
519 }
520 
521 static void call_wrapper(FBR_P)
522 {
523  int retval;
524  struct fbr_fiber *fiber = CURRENT_FIBER;
525 
526  fiber->func(FBR_A_ fiber->func_arg);
527 
528  retval = fbr_reclaim(FBR_A_ fbr_id_pack(fiber));
529  assert(0 == retval);
530  (void)retval;
531  fbr_yield(FBR_A);
532  assert(NULL);
533 }
534 
535 enum ev_action_hint {
536  EV_AH_OK = 0,
537  EV_AH_ARRIVED,
538  EV_AH_EINVAL
539 };
540 
541 static void item_dtor(_unused_ FBR_P_ void *arg)
542 {
543  struct fbr_id_tailq_i *item = arg;
544 
545  if (item->head) {
546  TAILQ_REMOVE(item->head, item, entries);
547  }
548 }
549 
550 static enum ev_action_hint prepare_ev(FBR_P_ struct fbr_ev_base *ev)
551 {
552  struct fbr_ev_watcher *e_watcher;
553  struct fbr_ev_mutex *e_mutex;
554  struct fbr_ev_cond_var *e_cond;
555  struct fbr_id_tailq_i *item = &ev->item;
556 
557  ev->arrived = 0;
558  ev->item.dtor.func = item_dtor;
559  ev->item.dtor.arg = item;
560  fbr_destructor_add(FBR_A_ &ev->item.dtor);
561 
562  switch (ev->type) {
563  case FBR_EV_WATCHER:
564  e_watcher = fbr_ev_upcast(ev, fbr_ev_watcher);
565  if (!ev_is_active(e_watcher->w)) {
566  fbr_destructor_remove(FBR_A_ &ev->item.dtor,
567  0 /* call it */);
568  return EV_AH_EINVAL;
569  }
570  e_watcher->w->data = e_watcher;
571  ev_set_cb(e_watcher->w, ev_watcher_cb);
572  break;
573  case FBR_EV_MUTEX:
574  e_mutex = fbr_ev_upcast(ev, fbr_ev_mutex);
575  if (fbr_id_isnull(e_mutex->mutex->locked_by)) {
576  e_mutex->mutex->locked_by = CURRENT_FIBER_ID;
577  return EV_AH_ARRIVED;
578  }
579  id_tailq_i_set(FBR_A_ item, CURRENT_FIBER);
580  item->ev = ev;
581  ev->data = item;
582  TAILQ_INSERT_TAIL(&e_mutex->mutex->pending, item, entries);
583  item->head = &e_mutex->mutex->pending;
584  break;
585  case FBR_EV_COND_VAR:
586  e_cond = fbr_ev_upcast(ev, fbr_ev_cond_var);
587  if (fbr_id_isnull(e_cond->mutex->locked_by)) {
588  fbr_destructor_remove(FBR_A_ &ev->item.dtor,
589  0 /* call it */);
590  return EV_AH_EINVAL;
591  }
592  id_tailq_i_set(FBR_A_ item, CURRENT_FIBER);
593  item->ev = ev;
594  ev->data = item;
595  TAILQ_INSERT_TAIL(&e_cond->cond->waiting, item, entries);
596  item->head = &e_cond->cond->waiting;
597  fbr_mutex_unlock(FBR_A_ e_cond->mutex);
598  break;
599  case FBR_EV_EIO:
600 #ifdef FBR_EIO_ENABLED
601  /* NOP */
602 #else
603  fbr_log_e(FBR_A_ "libevfibers: libeio support is not compiled");
604  abort();
605 #endif
606  break;
607  }
608  return EV_AH_OK;
609 }
610 
611 static void finish_ev(FBR_P_ struct fbr_ev_base *ev)
612 {
613  struct fbr_ev_cond_var *e_cond;
614  struct fbr_ev_watcher *e_watcher;
615  fbr_destructor_remove(FBR_A_ &ev->item.dtor, 1 /* call it */);
616  switch (ev->type) {
617  case FBR_EV_COND_VAR:
618  e_cond = fbr_ev_upcast(ev, fbr_ev_cond_var);
619  fbr_mutex_lock(FBR_A_ e_cond->mutex);
620  break;
621  case FBR_EV_WATCHER:
622  e_watcher = fbr_ev_upcast(ev, fbr_ev_watcher);
623  ev_set_cb(e_watcher->w, NULL);
624  break;
625  case FBR_EV_MUTEX:
626  /* NOP */
627  break;
628  case FBR_EV_EIO:
629 #ifdef FBR_EIO_ENABLED
630  /* NOP */
631 #else
632  fbr_log_e(FBR_A_ "libevfibers: libeio support is not compiled");
633  abort();
634 #endif
635  break;
636  }
637 }
638 
639 static void watcher_timer_dtor(_unused_ FBR_P_ void *_arg)
640 {
641  struct ev_timer *w = _arg;
642  ev_timer_stop(fctx->__p->loop, w);
643 }
644 
645 int fbr_ev_wait_to(FBR_P_ struct fbr_ev_base *events[], ev_tstamp timeout)
646 {
647  size_t size;
648  ev_timer timer;
649  struct fbr_ev_watcher watcher;
650  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
651  struct fbr_ev_base **new_events;
652  struct fbr_ev_base **ev_pptr;
653  int n_events;
654 
655  ev_timer_init(&timer, NULL, timeout, 0.);
656  ev_timer_start(fctx->__p->loop, &timer);
657  fbr_ev_watcher_init(FBR_A_ &watcher,
658  (struct ev_watcher *)&timer);
659  dtor.func = watcher_timer_dtor;
660  dtor.arg = &timer;
661  fbr_destructor_add(FBR_A_ &dtor);
662  size = 0;
663  for (ev_pptr = events; NULL != *ev_pptr; ev_pptr++)
664  size++;
665  new_events = alloca((size + 1) * sizeof(void *));
666  memcpy(new_events, events, size * sizeof(void *));
667  new_events[size] = &watcher.ev_base;
668  new_events[size + 1] = NULL;
669  n_events = fbr_ev_wait(FBR_A_ new_events);
670  fbr_destructor_remove(FBR_A_ &dtor, 1 /* Call it? */);
671  if (n_events < 0)
672  return n_events;
673  if (watcher.ev_base.arrived)
674  n_events--;
675  return n_events;
676 }
677 
678 int fbr_ev_wait(FBR_P_ struct fbr_ev_base *events[])
679 {
680  struct fbr_fiber *fiber = CURRENT_FIBER;
681  enum ev_action_hint hint;
682  int num = 0;
683  int i;
684 
685  fiber->ev.arrived = 0;
686  fiber->ev.waiting = events;
687 
688  for (i = 0; NULL != events[i]; i++) {
689  hint = prepare_ev(FBR_A_ events[i]);
690  switch (hint) {
691  case EV_AH_OK:
692  break;
693  case EV_AH_ARRIVED:
694  fiber->ev.arrived = 1;
695  events[i]->arrived = 1;
696  break;
697  case EV_AH_EINVAL:
698  return_error(-1, FBR_EINVAL);
699  }
700  }
701 
702  while (0 == fiber->ev.arrived)
703  fbr_yield(FBR_A);
704 
705  for (i = 0; NULL != events[i]; i++) {
706  if (events[i]->arrived) {
707  num++;
708  finish_ev(FBR_A_ events[i]);
709  } else
710  cancel_ev(FBR_A_ events[i]);
711  }
712  return_success(num);
713 }
714 
715 int fbr_ev_wait_one(FBR_P_ struct fbr_ev_base *one)
716 {
717  struct fbr_fiber *fiber = CURRENT_FIBER;
718  enum ev_action_hint hint;
719  struct fbr_ev_base *events[] = {one, NULL};
720 
721  fiber->ev.arrived = 0;
722  fiber->ev.waiting = events;
723 
724  hint = prepare_ev(FBR_A_ one);
725  switch (hint) {
726  case EV_AH_OK:
727  break;
728  case EV_AH_ARRIVED:
729  goto finish;
730  case EV_AH_EINVAL:
731  return_error(-1, FBR_EINVAL);
732  }
733 
734  while (0 == fiber->ev.arrived)
735  fbr_yield(FBR_A);
736 
737 finish:
738  finish_ev(FBR_A_ one);
739  return 0;
740 }
741 
742 int fbr_transfer(FBR_P_ fbr_id_t to)
743 {
744  struct fbr_fiber *callee;
745  struct fbr_fiber *caller = fctx->__p->sp->fiber;
746 
747  unpack_transfer_errno(-1, &callee, to);
748 
749  fctx->__p->sp++;
750 
751  fctx->__p->sp->fiber = callee;
752  fill_trace_info(FBR_A_ &fctx->__p->sp->tinfo);
753 
754  coro_transfer(&caller->ctx, &callee->ctx);
755 
756  return_success(0);
757 }
758 
759 void fbr_yield(FBR_P)
760 {
761  struct fbr_fiber *callee;
762  struct fbr_fiber *caller;
763  assert("Attemp to yield in a root fiber" &&
764  fctx->__p->sp->fiber != &fctx->__p->root);
765  callee = fctx->__p->sp->fiber;
766  caller = (--fctx->__p->sp)->fiber;
767  coro_transfer(&callee->ctx, &caller->ctx);
768 }
769 
770 int fbr_fd_nonblock(FBR_P_ int fd)
771 {
772  int flags, s;
773 
774  flags = fcntl(fd, F_GETFL, 0);
775  if (flags == -1)
776  return_error(-1, FBR_ESYSTEM);
777 
778  flags |= O_NONBLOCK;
779  s = fcntl(fd, F_SETFL, flags);
780  if (s == -1)
781  return_error(-1, FBR_ESYSTEM);
782 
783  return_success(0);
784 }
785 
786 static void ev_base_init(FBR_P_ struct fbr_ev_base *ev,
787  enum fbr_ev_type type)
788 {
789  memset(ev, 0x00, sizeof(*ev));
790  ev->type = type;
791  ev->id = CURRENT_FIBER_ID;
792  ev->fctx = fctx;
793 }
794 
795 void fbr_ev_watcher_init(FBR_P_ struct fbr_ev_watcher *ev, ev_watcher *w)
796 {
797  ev_base_init(FBR_A_ &ev->ev_base, FBR_EV_WATCHER);
798  ev->w = w;
799 }
800 
801 static void watcher_io_dtor(_unused_ FBR_P_ void *_arg)
802 {
803  struct ev_io *w = _arg;
804  ev_io_stop(fctx->__p->loop, w);
805 }
806 
807 int fbr_connect(FBR_P_ int sockfd, const struct sockaddr *addr,
808  socklen_t addrlen) {
809  ev_io io;
810  struct fbr_ev_watcher watcher;
811  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
812  int r;
813  socklen_t len;
814  r = connect(sockfd, addr, addrlen);
815  if ((-1 == r) && (EINPROGRESS != errno))
816  return -1;
817 
818  ev_io_init(&io, NULL, sockfd, EV_WRITE);
819  ev_io_start(fctx->__p->loop, &io);
820  dtor.func = watcher_io_dtor;
821  dtor.arg = &io;
822  fbr_destructor_add(FBR_A_ &dtor);
823  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
824  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
825 
826  len = sizeof(r);
827  if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (void *)&r, &len)) {
828  r = -1;
829  } else if ( 0 != r ) {
830  errno = r;
831  r = -1;
832  }
833 
834  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
835  ev_io_stop(fctx->__p->loop, &io);
836  return r;
837 }
838 
839 ssize_t fbr_read(FBR_P_ int fd, void *buf, size_t count)
840 {
841  ssize_t r;
842  ev_io io;
843  struct fbr_ev_watcher watcher;
844  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
845 
846  ev_io_init(&io, NULL, fd, EV_READ);
847  ev_io_start(fctx->__p->loop, &io);
848  dtor.func = watcher_io_dtor;
849  dtor.arg = &io;
850  fbr_destructor_add(FBR_A_ &dtor);
851 
852  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
853  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
854 
855  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
856  do {
857  r = read(fd, buf, count);
858  } while (-1 == r && EINTR == errno);
859 
860  ev_io_stop(fctx->__p->loop, &io);
861 
862  return r;
863 }
864 
865 ssize_t fbr_read_all(FBR_P_ int fd, void *buf, size_t count)
866 {
867  ssize_t r;
868  size_t done = 0;
869  ev_io io;
870  struct fbr_ev_watcher watcher;
871  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
872 
873  ev_io_init(&io, NULL, fd, EV_READ);
874  ev_io_start(fctx->__p->loop, &io);
875  dtor.func = watcher_io_dtor;
876  dtor.arg = &io;
877  fbr_destructor_add(FBR_A_ &dtor);
878 
879  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
880 
881  while (count != done) {
882 next:
883  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
884  for (;;) {
885  r = read(fd, buf + done, count - done);
886  if (-1 == r) {
887  switch (errno) {
888  case EINTR:
889  continue;
890  case EAGAIN:
891  goto next;
892  default:
893  goto error;
894  }
895  }
896  break;
897  }
898  if (0 == r)
899  break;
900  done += r;
901  }
902  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
903  ev_io_stop(fctx->__p->loop, &io);
904  return (ssize_t)done;
905 
906 error:
907  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
908  ev_io_stop(fctx->__p->loop, &io);
909  return -1;
910 }
911 
912 ssize_t fbr_readline(FBR_P_ int fd, void *buffer, size_t n)
913 {
914  ssize_t num_read;
915  size_t total_read;
916  char *buf;
917  char ch;
918 
919  if (n <= 0 || buffer == NULL) {
920  errno = EINVAL;
921  return -1;
922  }
923 
924  buf = buffer;
925 
926  total_read = 0;
927  for (;;) {
928  num_read = fbr_read(FBR_A_ fd, &ch, 1);
929 
930  if (num_read == -1) {
931  if (errno == EINTR)
932  continue;
933  else
934  return -1;
935 
936  } else if (num_read == 0) {
937  if (total_read == 0)
938  return 0;
939  else
940  break;
941 
942  } else {
943  if (total_read < n - 1) {
944  total_read++;
945  *buf++ = ch;
946  }
947 
948  if (ch == '\n')
949  break;
950  }
951  }
952 
953  *buf = '\0';
954  return total_read;
955 }
956 
957 ssize_t fbr_write(FBR_P_ int fd, const void *buf, size_t count)
958 {
959  ssize_t r;
960  ev_io io;
961  struct fbr_ev_watcher watcher;
962  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
963 
964  ev_io_init(&io, NULL, fd, EV_WRITE);
965  ev_io_start(fctx->__p->loop, &io);
966  dtor.func = watcher_io_dtor;
967  dtor.arg = &io;
968  fbr_destructor_add(FBR_A_ &dtor);
969 
970  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
971  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
972 
973  do {
974  r = write(fd, buf, count);
975  } while (-1 == r && EINTR == errno);
976 
977  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
978  ev_io_stop(fctx->__p->loop, &io);
979  return r;
980 }
981 
982 ssize_t fbr_write_all(FBR_P_ int fd, const void *buf, size_t count)
983 {
984  ssize_t r;
985  size_t done = 0;
986  ev_io io;
987  struct fbr_ev_watcher watcher;
988  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
989 
990  ev_io_init(&io, NULL, fd, EV_WRITE);
991  ev_io_start(fctx->__p->loop, &io);
992  dtor.func = watcher_io_dtor;
993  dtor.arg = &io;
994  fbr_destructor_add(FBR_A_ &dtor);
995 
996  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
997 
998  while (count != done) {
999 next:
1000  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1001  for (;;) {
1002  r = write(fd, buf + done, count - done);
1003  if (-1 == r) {
1004  switch (errno) {
1005  case EINTR:
1006  continue;
1007  case EAGAIN:
1008  goto next;
1009  default:
1010  goto error;
1011  }
1012  }
1013  break;
1014  }
1015  done += r;
1016  }
1017  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1018  ev_io_stop(fctx->__p->loop, &io);
1019  return (ssize_t)done;
1020 
1021 error:
1022  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1023  ev_io_stop(fctx->__p->loop, &io);
1024  return -1;
1025 }
1026 
1027 ssize_t fbr_recvfrom(FBR_P_ int sockfd, void *buf, size_t len, int flags,
1028  struct sockaddr *src_addr, socklen_t *addrlen)
1029 {
1030  ev_io io;
1031  struct fbr_ev_watcher watcher;
1032  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
1033 
1034  ev_io_init(&io, NULL, sockfd, EV_READ);
1035  ev_io_start(fctx->__p->loop, &io);
1036  dtor.func = watcher_io_dtor;
1037  dtor.arg = &io;
1038  fbr_destructor_add(FBR_A_ &dtor);
1039 
1040  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
1041  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1042 
1043  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1044  ev_io_stop(fctx->__p->loop, &io);
1045 
1046  return recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
1047 }
1048 
1049 ssize_t fbr_recv(FBR_P_ int sockfd, void *buf, size_t len, int flags)
1050 {
1051  ev_io io;
1052  struct fbr_ev_watcher watcher;
1053  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
1054 
1055  ev_io_init(&io, NULL, sockfd, EV_READ);
1056  ev_io_start(fctx->__p->loop, &io);
1057  dtor.func = watcher_io_dtor;
1058  dtor.arg = &io;
1059  fbr_destructor_add(FBR_A_ &dtor);
1060 
1061  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
1062  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1063 
1064  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1065  ev_io_stop(fctx->__p->loop, &io);
1066 
1067  return recv(sockfd, buf, len, flags);
1068 }
1069 
1070 ssize_t fbr_sendto(FBR_P_ int sockfd, const void *buf, size_t len, int flags,
1071  const struct sockaddr *dest_addr, socklen_t addrlen)
1072 {
1073  ev_io io;
1074  struct fbr_ev_watcher watcher;
1075  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
1076 
1077  ev_io_init(&io, NULL, sockfd, EV_WRITE);
1078  ev_io_start(fctx->__p->loop, &io);
1079  dtor.func = watcher_io_dtor;
1080  dtor.arg = &io;
1081  fbr_destructor_add(FBR_A_ &dtor);
1082 
1083  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
1084  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1085 
1086  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1087  ev_io_stop(fctx->__p->loop, &io);
1088 
1089  return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
1090 }
1091 
1092 ssize_t fbr_send(FBR_P_ int sockfd, const void *buf, size_t len, int flags)
1093 {
1094  ev_io io;
1095  struct fbr_ev_watcher watcher;
1096  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
1097 
1098  ev_io_init(&io, NULL, sockfd, EV_WRITE);
1099  ev_io_start(fctx->__p->loop, &io);
1100  dtor.func = watcher_io_dtor;
1101  dtor.arg = &io;
1102  fbr_destructor_add(FBR_A_ &dtor);
1103 
1104  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
1105  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1106 
1107  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1108  ev_io_stop(fctx->__p->loop, &io);
1109 
1110  return send(sockfd, buf, len, flags);
1111 }
1112 
1113 int fbr_accept(FBR_P_ int sockfd, struct sockaddr *addr, socklen_t *addrlen)
1114 {
1115  int r;
1116  ev_io io;
1117  struct fbr_ev_watcher watcher;
1118  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
1119 
1120  ev_io_init(&io, NULL, sockfd, EV_READ);
1121  ev_io_start(fctx->__p->loop, &io);
1122  dtor.func = watcher_io_dtor;
1123  dtor.arg = &io;
1124  fbr_destructor_add(FBR_A_ &dtor);
1125 
1126  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&io);
1127  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1128 
1129  do {
1130  r = accept(sockfd, addr, addrlen);
1131  } while (-1 == r && EINTR == errno);
1132 
1133  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1134  ev_io_stop(fctx->__p->loop, &io);
1135 
1136  return r;
1137 }
1138 
1139 ev_tstamp fbr_sleep(FBR_P_ ev_tstamp seconds)
1140 {
1141  ev_timer timer;
1142  struct fbr_ev_watcher watcher;
1143  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
1144  ev_tstamp expected = ev_now(fctx->__p->loop) + seconds;
1145 
1146  ev_timer_init(&timer, NULL, seconds, 0.);
1147  ev_timer_start(fctx->__p->loop, &timer);
1148  dtor.func = watcher_timer_dtor;
1149  dtor.arg = &timer;
1150  fbr_destructor_add(FBR_A_ &dtor);
1151 
1152  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&timer);
1153  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1154 
1155  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1156  ev_timer_stop(fctx->__p->loop, &timer);
1157 
1158  return max(0., expected - ev_now(fctx->__p->loop));
1159 }
1160 
1161 static long get_page_size()
1162 {
1163  static long sz;
1164  if (0 == sz)
1165  sz = sysconf(_SC_PAGESIZE);
1166  return sz;
1167 }
1168 
1169 static size_t round_up_to_page_size(size_t size)
1170 {
1171  long sz = get_page_size();
1172  size_t remainder;
1173  remainder = size % sz;
1174  if (remainder == 0)
1175  return size;
1176  return size + sz - remainder;
1177 }
1178 
1179 fbr_id_t fbr_create(FBR_P_ const char *name, fbr_fiber_func_t func, void *arg,
1180  size_t stack_size)
1181 {
1182  struct fbr_fiber *fiber;
1183  if (!LIST_EMPTY(&fctx->__p->reclaimed)) {
1184  fiber = LIST_FIRST(&fctx->__p->reclaimed);
1185  LIST_REMOVE(fiber, entries.reclaimed);
1186  } else {
1187  fiber = malloc(sizeof(struct fbr_fiber));
1188  memset(fiber, 0x00, sizeof(struct fbr_fiber));
1189  if (0 == stack_size)
1190  stack_size = FBR_STACK_SIZE;
1191  stack_size = round_up_to_page_size(stack_size);
1192  fiber->stack = mmap(NULL, stack_size, PROT_READ | PROT_WRITE,
1193  MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
1194  if (MAP_FAILED == fiber->stack)
1195  err(EXIT_FAILURE, "mmap failed");
1196  fiber->stack_size = stack_size;
1197  (void)VALGRIND_STACK_REGISTER(fiber->stack, fiber->stack +
1198  stack_size);
1199  fbr_cond_init(FBR_A_ &fiber->reclaim_cond);
1200  fiber->id = fctx->__p->last_id++;
1201  }
1202  coro_create(&fiber->ctx, (coro_func)call_wrapper, FBR_A, fiber->stack,
1203  fiber->stack_size);
1204  LIST_INIT(&fiber->children);
1205  LIST_INIT(&fiber->pool);
1206  TAILQ_INIT(&fiber->destructors);
1207  strncpy(fiber->name, name, FBR_MAX_FIBER_NAME);
1208  fiber->func = func;
1209  fiber->func_arg = arg;
1210  LIST_INSERT_HEAD(&CURRENT_FIBER->children, fiber, entries.children);
1211  fiber->parent = CURRENT_FIBER;
1212  fiber->no_reclaim = 0;
1213  fiber->want_reclaim = 0;
1214  return fbr_id_pack(fiber);
1215 }
1216 
1217 int fbr_disown(FBR_P_ fbr_id_t parent_id)
1218 {
1219  struct fbr_fiber *fiber, *parent;
1220  if (!fbr_id_isnull(parent_id))
1221  unpack_transfer_errno(-1, &parent, parent_id);
1222  else
1223  parent = &fctx->__p->root;
1224  fiber = CURRENT_FIBER;
1225  LIST_REMOVE(fiber, entries.children);
1226  LIST_INSERT_HEAD(&parent->children, fiber, entries.children);
1227  fiber->parent = parent;
1228  return_success(0);
1229 }
1230 
1231 fbr_id_t fbr_parent(FBR_P)
1232 {
1233  struct fbr_fiber *fiber = CURRENT_FIBER;
1234  if (fiber->parent == &fctx->__p->root)
1235  return FBR_ID_NULL;
1236  return fbr_id_pack(fiber->parent);
1237 }
1238 
1239 void *fbr_calloc(FBR_P_ unsigned int nmemb, size_t size)
1240 {
1241  void *ptr;
1242  fprintf(stderr, "libevfibers: fbr_calloc is deprecated\n");
1243  ptr = allocate_in_fiber(FBR_A_ nmemb * size, CURRENT_FIBER);
1244  memset(ptr, 0x00, nmemb * size);
1245  return ptr;
1246 }
1247 
1248 void *fbr_alloc(FBR_P_ size_t size)
1249 {
1250  fprintf(stderr, "libevfibers: fbr_alloc is deprecated\n");
1251  return allocate_in_fiber(FBR_A_ size, CURRENT_FIBER);
1252 }
1253 
1254 void fbr_alloc_set_destructor(_unused_ FBR_P_ void *ptr,
1255  fbr_alloc_destructor_func_t func, void *context)
1256 {
1257  struct mem_pool *pool_entry;
1258  fprintf(stderr, "libevfibers:"
1259  " fbr_alloc_set_destructor is deprecated\n");
1260  pool_entry = (struct mem_pool *)ptr - 1;
1261  pool_entry->destructor = func;
1262  pool_entry->destructor_context = context;
1263 }
1264 
1265 void fbr_free(FBR_P_ void *ptr)
1266 {
1267  fprintf(stderr, "libevfibers: fbr_free is deprecated\n");
1268  fbr_free_in_fiber(FBR_A_ CURRENT_FIBER, ptr, 1);
1269 }
1270 
1271 void fbr_free_nd(FBR_P_ void *ptr)
1272 {
1273  fprintf(stderr, "libevfibers: fbr_free_nd is deprecated\n");
1274  fbr_free_in_fiber(FBR_A_ CURRENT_FIBER, ptr, 0);
1275 }
1276 
1277 void fbr_dump_stack(FBR_P_ fbr_logutil_func_t log)
1278 {
1279  struct fbr_stack_item *ptr = fctx->__p->sp;
1280  (*log)(FBR_A_ "%s", "Fiber call stack:");
1281  (*log)(FBR_A_ "%s", "-------------------------------");
1282  while (ptr >= fctx->__p->stack) {
1283  (*log)(FBR_A_ "fiber_call: %p\t%s",
1284  ptr->fiber,
1285  ptr->fiber->name);
1286  print_trace_info(FBR_A_ &ptr->tinfo, log);
1287  (*log)(FBR_A_ "%s", "-------------------------------");
1288  ptr--;
1289  }
1290 }
1291 
1292 static void transfer_later(FBR_P_ struct fbr_id_tailq_i *item)
1293 {
1294  int was_empty;
1295  was_empty = TAILQ_EMPTY(&fctx->__p->pending_fibers);
1296  TAILQ_INSERT_TAIL(&fctx->__p->pending_fibers, item, entries);
1297  item->head = &fctx->__p->pending_fibers;
1298  if (was_empty && !TAILQ_EMPTY(&fctx->__p->pending_fibers)) {
1299  ev_async_start(fctx->__p->loop, &fctx->__p->pending_async);
1300  }
1301  ev_async_send(fctx->__p->loop, &fctx->__p->pending_async);
1302 }
1303 
1304 static void transfer_later_tailq(FBR_P_ struct fbr_id_tailq *tailq)
1305 {
1306  int was_empty;
1307  struct fbr_id_tailq_i *item;
1308  TAILQ_FOREACH(item, tailq, entries) {
1309  item->head = &fctx->__p->pending_fibers;
1310  }
1311  was_empty = TAILQ_EMPTY(&fctx->__p->pending_fibers);
1312  TAILQ_CONCAT(&fctx->__p->pending_fibers, tailq, entries);
1313  if (was_empty && !TAILQ_EMPTY(&fctx->__p->pending_fibers)) {
1314  ev_async_start(fctx->__p->loop, &fctx->__p->pending_async);
1315  }
1316  ev_async_send(fctx->__p->loop, &fctx->__p->pending_async);
1317 }
1318 
1319 void fbr_ev_mutex_init(FBR_P_ struct fbr_ev_mutex *ev,
1320  struct fbr_mutex *mutex)
1321 {
1322  ev_base_init(FBR_A_ &ev->ev_base, FBR_EV_MUTEX);
1323  ev->mutex = mutex;
1324 }
1325 
1326 void fbr_mutex_init(_unused_ FBR_P_ struct fbr_mutex *mutex)
1327 {
1328  mutex->locked_by = FBR_ID_NULL;
1329  TAILQ_INIT(&mutex->pending);
1330 }
1331 
1332 void fbr_mutex_lock(FBR_P_ struct fbr_mutex *mutex)
1333 {
1334  struct fbr_ev_mutex ev;
1335 
1336  fbr_ev_mutex_init(FBR_A_ &ev, mutex);
1337  fbr_ev_wait_one(FBR_A_ &ev.ev_base);
1338  assert(fbr_id_eq(mutex->locked_by, CURRENT_FIBER_ID));
1339 }
1340 
1341 int fbr_mutex_trylock(FBR_P_ struct fbr_mutex *mutex)
1342 {
1343  if (fbr_id_isnull(mutex->locked_by)) {
1344  mutex->locked_by = CURRENT_FIBER_ID;
1345  return 1;
1346  }
1347  return 0;
1348 }
1349 
1350 void fbr_mutex_unlock(FBR_P_ struct fbr_mutex *mutex)
1351 {
1352  struct fbr_id_tailq_i *item, *x;
1353  struct fbr_fiber *fiber = NULL;
1354 
1355  if (TAILQ_EMPTY(&mutex->pending)) {
1356  mutex->locked_by = FBR_ID_NULL;
1357  return;
1358  }
1359 
1360  TAILQ_FOREACH_SAFE(item, &mutex->pending, entries, x) {
1361  assert(item->head == &mutex->pending);
1362  TAILQ_REMOVE(&mutex->pending, item, entries);
1363  if (-1 == fbr_id_unpack(FBR_A_ &fiber, item->id)) {
1364  fbr_log_e(FBR_A_ "libevfibers: unexpected error trying"
1365  " to find a fiber by id: %s",
1366  fbr_strerror(FBR_A_ fctx->f_errno));
1367  continue;
1368  }
1369  break;
1370  }
1371 
1372  mutex->locked_by = item->id;
1373  post_ev(FBR_A_ fiber, item->ev);
1374 
1375  transfer_later(FBR_A_ item);
1376 }
1377 
1378 void fbr_mutex_destroy(_unused_ FBR_P_ _unused_ struct fbr_mutex *mutex)
1379 {
1380  /* Since mutex is stack allocated now, this efffeectively turns into
1381  * NOOP. But we might consider adding some cleanup in the future.
1382  */
1383 }
1384 
1385 void fbr_ev_cond_var_init(FBR_P_ struct fbr_ev_cond_var *ev,
1386  struct fbr_cond_var *cond, struct fbr_mutex *mutex)
1387 {
1388  ev_base_init(FBR_A_ &ev->ev_base, FBR_EV_COND_VAR);
1389  ev->cond = cond;
1390  ev->mutex = mutex;
1391 }
1392 
1393 void fbr_cond_init(_unused_ FBR_P_ struct fbr_cond_var *cond)
1394 {
1395  cond->mutex = NULL;
1396  TAILQ_INIT(&cond->waiting);
1397 }
1398 
1399 void fbr_cond_destroy(_unused_ FBR_P_ _unused_ struct fbr_cond_var *cond)
1400 {
1401  /* Since condvar is stack allocated now, this efffeectively turns into
1402  * NOOP. But we might consider adding some cleanup in the future.
1403  */
1404 }
1405 
1406 int fbr_cond_wait(FBR_P_ struct fbr_cond_var *cond, struct fbr_mutex *mutex)
1407 {
1408  struct fbr_ev_cond_var ev;
1409 
1410  if (fbr_id_isnull(mutex->locked_by))
1411  return_error(-1, FBR_EINVAL);
1412 
1413  fbr_ev_cond_var_init(FBR_A_ &ev, cond, mutex);
1414  fbr_ev_wait_one(FBR_A_ &ev.ev_base);
1415  return_success(0);
1416 }
1417 
1418 void fbr_cond_broadcast(FBR_P_ struct fbr_cond_var *cond)
1419 {
1420  struct fbr_id_tailq_i *item;
1421  struct fbr_fiber *fiber;
1422  if (TAILQ_EMPTY(&cond->waiting))
1423  return;
1424  TAILQ_FOREACH(item, &cond->waiting, entries) {
1425  if(-1 == fbr_id_unpack(FBR_A_ &fiber, item->id)) {
1426  assert(FBR_ENOFIBER == fctx->f_errno);
1427  continue;
1428  }
1429  post_ev(FBR_A_ fiber, item->ev);
1430  }
1431  transfer_later_tailq(FBR_A_ &cond->waiting);
1432 }
1433 
1434 void fbr_cond_signal(FBR_P_ struct fbr_cond_var *cond)
1435 {
1436  struct fbr_id_tailq_i *item;
1437  struct fbr_fiber *fiber;
1438  if (TAILQ_EMPTY(&cond->waiting))
1439  return;
1440  item = TAILQ_FIRST(&cond->waiting);
1441  if(-1 == fbr_id_unpack(FBR_A_ &fiber, item->id)) {
1442  assert(FBR_ENOFIBER == fctx->f_errno);
1443  return;
1444  }
1445  post_ev(FBR_A_ fiber, item->ev);
1446 
1447  assert(item->head == &cond->waiting);
1448  TAILQ_REMOVE(&cond->waiting, item, entries);
1449  transfer_later(FBR_A_ item);
1450 }
1451 
1452 int fbr_vrb_init(struct fbr_vrb *vrb, size_t size, const char *file_pattern)
1453 {
1454  int fd;
1455  size_t sz = get_page_size();
1456  size = (size ? round_up_to_page_size(size) : sz);
1457  void *ptr;
1458  char *temp_name;
1459 
1460  temp_name = strdup(file_pattern);
1461  //fctx->__p->vrb_file_pattern);
1462  vrb->mem_ptr_size = size * 2 + sz * 2;
1463  vrb->mem_ptr = mmap(NULL, vrb->mem_ptr_size, PROT_NONE,
1464  MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
1465  if (MAP_FAILED == vrb->mem_ptr)
1466  return -1;
1467  vrb->lower_ptr = vrb->mem_ptr + sz;
1468  vrb->upper_ptr = vrb->lower_ptr + size;
1469  vrb->ptr_size = size;
1470  vrb->data_ptr = vrb->lower_ptr;
1471  vrb->space_ptr = vrb->lower_ptr;
1472 
1473  fd = mkstemp(temp_name);
1474  if (0 >= fd) {
1475  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1476  free(temp_name);
1477  return -1;
1478  }
1479 
1480  if (0 > unlink(temp_name)) {
1481  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1482  free(temp_name);
1483  close(fd);
1484  return -1;
1485  }
1486  free(temp_name);
1487 
1488  if (0 > ftruncate(fd, size)) {
1489  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1490  close(fd);
1491  return -1;
1492  }
1493 
1494  ptr = mmap(vrb->lower_ptr, size, PROT_READ | PROT_WRITE,
1495  MAP_FIXED | MAP_SHARED, fd, 0);
1496  if (MAP_FAILED == ptr) {
1497  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1498  close(fd);
1499  return -1;
1500  }
1501  if (ptr != vrb->lower_ptr) {
1502  munmap(vrb->lower_ptr, vrb->ptr_size);
1503  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1504  close(fd);
1505  return -1;
1506  }
1507 
1508  ptr = mmap(vrb->upper_ptr, size, PROT_READ | PROT_WRITE,
1509  MAP_FIXED | MAP_SHARED, fd, 0);
1510  if (MAP_FAILED == ptr) {
1511  munmap(vrb->lower_ptr, vrb->ptr_size);
1512  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1513  close(fd);
1514  return -1;
1515  }
1516 
1517  if (ptr != vrb->upper_ptr) {
1518  munmap(vrb->upper_ptr, vrb->ptr_size);
1519  munmap(vrb->lower_ptr, vrb->ptr_size);
1520  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1521  close(fd);
1522  return -1;
1523  }
1524 
1525  close(fd);
1526  return 0;
1527 }
1528 
1529 int fbr_buffer_init(FBR_P_ struct fbr_buffer *buffer, size_t size)
1530 {
1531  int rv;
1532  rv = fbr_vrb_init(&buffer->vrb, size, fctx->__p->buffer_file_pattern);
1533  if (rv)
1534  return_error(-1, FBR_EBUFFERMMAP);
1535 
1536  buffer->prepared_bytes = 0;
1537  buffer->waiting_bytes = 0;
1538  fbr_cond_init(FBR_A_ &buffer->committed_cond);
1539  fbr_cond_init(FBR_A_ &buffer->bytes_freed_cond);
1540  fbr_mutex_init(FBR_A_ &buffer->write_mutex);
1541  fbr_mutex_init(FBR_A_ &buffer->read_mutex);
1542  return_success(0);
1543 }
1544 
1545 void fbr_vrb_destroy(struct fbr_vrb *vrb)
1546 {
1547  munmap(vrb->upper_ptr, vrb->ptr_size);
1548  munmap(vrb->lower_ptr, vrb->ptr_size);
1549  munmap(vrb->mem_ptr, vrb->mem_ptr_size);
1550 }
1551 
1552 void fbr_buffer_destroy(FBR_P_ struct fbr_buffer *buffer)
1553 {
1554  fbr_vrb_destroy(&buffer->vrb);
1555 
1556  fbr_mutex_destroy(FBR_A_ &buffer->read_mutex);
1557  fbr_mutex_destroy(FBR_A_ &buffer->write_mutex);
1558  fbr_cond_destroy(FBR_A_ &buffer->committed_cond);
1559  fbr_cond_destroy(FBR_A_ &buffer->bytes_freed_cond);
1560 }
1561 
1562 void *fbr_buffer_alloc_prepare(FBR_P_ struct fbr_buffer *buffer, size_t size)
1563 {
1564  if (size > fbr_buffer_size(FBR_A_ buffer))
1565  return_error(NULL, FBR_EINVAL);
1566 
1567  fbr_mutex_lock(FBR_A_ &buffer->write_mutex);
1568 
1569  while (buffer->prepared_bytes > 0)
1570  fbr_cond_wait(FBR_A_ &buffer->committed_cond,
1571  &buffer->write_mutex);
1572 
1573  assert(0 == buffer->prepared_bytes);
1574 
1575  buffer->prepared_bytes = size;
1576 
1577  while (fbr_buffer_free_bytes(FBR_A_ buffer) < size)
1578  fbr_cond_wait(FBR_A_ &buffer->bytes_freed_cond,
1579  &buffer->write_mutex);
1580 
1581  return fbr_buffer_space_ptr(FBR_A_ buffer);
1582 }
1583 
1584 void fbr_buffer_alloc_commit(FBR_P_ struct fbr_buffer *buffer)
1585 {
1586  fbr_vrb_give(&buffer->vrb, buffer->prepared_bytes);
1587  buffer->prepared_bytes = 0;
1588  fbr_cond_signal(FBR_A_ &buffer->committed_cond);
1589  fbr_mutex_unlock(FBR_A_ &buffer->write_mutex);
1590 }
1591 
1592 void fbr_buffer_alloc_abort(FBR_P_ struct fbr_buffer *buffer)
1593 {
1594  buffer->prepared_bytes = 0;
1595  fbr_cond_signal(FBR_A_ &buffer->committed_cond);
1596  fbr_mutex_unlock(FBR_A_ &buffer->write_mutex);
1597 }
1598 
1599 void *fbr_buffer_read_address(FBR_P_ struct fbr_buffer *buffer, size_t size)
1600 {
1601  int retval;
1602  if (size > fbr_buffer_size(FBR_A_ buffer))
1603  return_error(NULL, FBR_EINVAL);
1604 
1605  fbr_mutex_lock(FBR_A_ &buffer->read_mutex);
1606 
1607  while (fbr_buffer_bytes(FBR_A_ buffer) < size) {
1608  retval = fbr_cond_wait(FBR_A_ &buffer->committed_cond,
1609  &buffer->read_mutex);
1610  assert(0 == retval);
1611  (void)retval;
1612  }
1613 
1614  buffer->waiting_bytes = size;
1615 
1616  return_success(fbr_buffer_data_ptr(FBR_A_ buffer));
1617 }
1618 
1619 void fbr_buffer_read_advance(FBR_P_ struct fbr_buffer *buffer)
1620 {
1621  fbr_vrb_take(&buffer->vrb, buffer->waiting_bytes);
1622 
1623  fbr_cond_signal(FBR_A_ &buffer->bytes_freed_cond);
1624  fbr_mutex_unlock(FBR_A_ &buffer->read_mutex);
1625 }
1626 
1627 void fbr_buffer_read_discard(FBR_P_ struct fbr_buffer *buffer)
1628 {
1629  fbr_mutex_unlock(FBR_A_ &buffer->read_mutex);
1630 }
1631 
1632 int fbr_buffer_resize(FBR_P_ struct fbr_buffer *buffer, size_t size)
1633 {
1634  int rv;
1635  fbr_mutex_lock(FBR_A_ &buffer->read_mutex);
1636  fbr_mutex_lock(FBR_A_ &buffer->write_mutex);
1637  rv = fbr_vrb_resize(&buffer->vrb, size, fctx->__p->buffer_file_pattern);
1638  fbr_mutex_unlock(FBR_A_ &buffer->write_mutex);
1639  fbr_mutex_unlock(FBR_A_ &buffer->read_mutex);
1640  if (rv)
1641  return_error(-1, FBR_EBUFFERMMAP);
1642  return_success(0);
1643 }
1644 
1645 void *fbr_get_user_data(FBR_P_ fbr_id_t id)
1646 {
1647  struct fbr_fiber *fiber;
1648  unpack_transfer_errno(NULL, &fiber, id);
1649  return_success(fiber->user_data);
1650 }
1651 
1652 int fbr_set_user_data(FBR_P_ fbr_id_t id, void *data)
1653 {
1654  struct fbr_fiber *fiber;
1655  unpack_transfer_errno(-1, &fiber, id);
1656  fiber->user_data = data;
1657  return_success(0);
1658 }
1659 
1660 void fbr_destructor_add(FBR_P_ struct fbr_destructor *dtor)
1661 {
1662  struct fbr_fiber *fiber = CURRENT_FIBER;
1663  TAILQ_INSERT_TAIL(&fiber->destructors, dtor, entries);
1664  dtor->active = 1;
1665 }
1666 
1667 void fbr_destructor_remove(FBR_P_ struct fbr_destructor *dtor,
1668  int call)
1669 {
1670  struct fbr_fiber *fiber = CURRENT_FIBER;
1671 
1672  if (0 == dtor->active)
1673  return;
1674 
1675  TAILQ_REMOVE(&fiber->destructors, dtor, entries);
1676  if (call)
1677  dtor->func(FBR_A_ dtor->arg);
1678  dtor->active = 0;
1679 }
1680 
1681 static inline int wrap_ffsll(uint64_t val)
1682 {
1683  /* TODO: Add some check for the existance of this builtin */
1684  return __builtin_ffsll(val);
1685 }
1686 
1687 static inline int is_key_registered(FBR_P_ fbr_key_t key)
1688 {
1689  return 0 == (fctx->__p->key_free_mask & (1 << key));
1690 }
1691 
1692 static inline void register_key(FBR_P_ fbr_key_t key)
1693 {
1694  fctx->__p->key_free_mask &= ~(1 << key);
1695 }
1696 
1697 static inline void unregister_key(FBR_P_ fbr_key_t key)
1698 {
1699  fctx->__p->key_free_mask |= (1 << key);
1700 }
1701 
1702 int fbr_key_create(FBR_P_ fbr_key_t *key_ptr)
1703 {
1704  fbr_key_t key = wrap_ffsll(fctx->__p->key_free_mask);
1705  assert(key < FBR_MAX_KEY);
1706  register_key(FBR_A_ key);
1707  *key_ptr = key;
1708  return_success(0);
1709 }
1710 
1711 int fbr_key_delete(FBR_P_ fbr_key_t key)
1712 {
1713  if (!is_key_registered(FBR_A_ key))
1714  return_error(-1, FBR_ENOKEY);
1715 
1716  unregister_key(FBR_A_ key);
1717 
1718  return_success(0);
1719 }
1720 
1721 int fbr_key_set(FBR_P_ fbr_id_t id, fbr_key_t key, void *value)
1722 {
1723  struct fbr_fiber *fiber;
1724 
1725  unpack_transfer_errno(-1, &fiber, id);
1726 
1727  if (!is_key_registered(FBR_A_ key))
1728  return_error(-1, FBR_ENOKEY);
1729 
1730  fiber->key_data[key] = value;
1731  return_success(0);
1732 }
1733 
1734 void *fbr_key_get(FBR_P_ fbr_id_t id, fbr_key_t key)
1735 {
1736  struct fbr_fiber *fiber;
1737 
1738  unpack_transfer_errno(NULL, &fiber, id);
1739 
1740  if (!is_key_registered(FBR_A_ key))
1741  return_error(NULL, FBR_ENOKEY);
1742 
1743  return fiber->key_data[key];
1744 }
1745 
1746 const char *fbr_get_name(FBR_P_ fbr_id_t id)
1747 {
1748  struct fbr_fiber *fiber;
1749  unpack_transfer_errno(NULL, &fiber, id);
1750  return_success(fiber->name);
1751 }
1752 
1753 int fbr_set_name(FBR_P_ fbr_id_t id, const char *name)
1754 {
1755  struct fbr_fiber *fiber;
1756  unpack_transfer_errno(-1, &fiber, id);
1757  strncpy(fiber->name, name, FBR_MAX_FIBER_NAME);
1758  return_success(0);
1759 }
1760 
1761 static int make_pipe(FBR_P_ int *r, int*w)
1762 {
1763  int fds[2];
1764  int retval;
1765  retval = pipe(fds);
1766  if (-1 == retval)
1767  return_error(-1, FBR_ESYSTEM);
1768  *r = fds[0];
1769  *w = fds[1];
1770  return_success(0);
1771 }
1772 
1773 pid_t fbr_popen3(FBR_P_ const char *filename, char *const argv[],
1774  char *const envp[], const char *working_dir,
1775  int *stdin_w_ptr, int *stdout_r_ptr, int *stderr_r_ptr)
1776 {
1777  pid_t pid;
1778  int stdin_r = 0, stdin_w = 0;
1779  int stdout_r = 0, stdout_w = 0;
1780  int stderr_r = 0, stderr_w = 0;
1781  int devnull;
1782  int retval;
1783 
1784  if (!stdin_w_ptr || !stdout_r_ptr || !stderr_r_ptr)
1785  devnull = open("/dev/null", O_WRONLY);
1786 
1787  retval = (stdin_w_ptr ? make_pipe(FBR_A_ &stdin_r, &stdin_w) : 0);
1788  if (retval)
1789  return retval;
1790  retval = (stdout_r_ptr ? make_pipe(FBR_A_ &stdout_r, &stdout_w) : 0);
1791  if (retval)
1792  return retval;
1793  retval = (stderr_r_ptr ? make_pipe(FBR_A_ &stderr_r, &stderr_w) : 0);
1794  if (retval)
1795  return retval;
1796 
1797  pid = fork();
1798  if (-1 == pid)
1799  return_error(-1, FBR_ESYSTEM);
1800  if (0 == pid) {
1801  /* Child */
1802  ev_break(EV_DEFAULT, EVBREAK_ALL);
1803  if (stdin_w_ptr) {
1804  retval = close(stdin_w);
1805  if (-1 == retval)
1806  err(EXIT_FAILURE, "close");
1807  retval = dup2(stdin_r, STDIN_FILENO);
1808  if (-1 == retval)
1809  err(EXIT_FAILURE, "dup2");
1810  } else {
1811  devnull = open("/dev/null", O_RDONLY);
1812  if (-1 == retval)
1813  err(EXIT_FAILURE, "open");
1814  retval = dup2(devnull, STDIN_FILENO);
1815  if (-1 == retval)
1816  err(EXIT_FAILURE, "dup2");
1817  }
1818  if (stdout_r_ptr) {
1819  retval = close(stdout_r);
1820  if (-1 == retval)
1821  err(EXIT_FAILURE, "close");
1822  retval = dup2(stdout_w, STDOUT_FILENO);
1823  if (-1 == retval)
1824  err(EXIT_FAILURE, "dup2");
1825  } else {
1826  devnull = open("/dev/null", O_WRONLY);
1827  if (-1 == retval)
1828  err(EXIT_FAILURE, "open");
1829  retval = dup2(devnull, STDOUT_FILENO);
1830  if (-1 == retval)
1831  err(EXIT_FAILURE, "dup2");
1832  }
1833  if (stderr_r_ptr) {
1834  retval = close(stderr_r);
1835  if (-1 == retval)
1836  err(EXIT_FAILURE, "close");
1837  retval = dup2(stderr_w, STDERR_FILENO);
1838  if (-1 == retval)
1839  err(EXIT_FAILURE, "dup2");
1840  } else {
1841  devnull = open("/dev/null", O_WRONLY);
1842  if (-1 == retval)
1843  err(EXIT_FAILURE, "open");
1844  retval = dup2(stderr_w, STDERR_FILENO);
1845  if (-1 == retval)
1846  err(EXIT_FAILURE, "dup2");
1847  }
1848 
1849  if (working_dir) {
1850  retval = chdir(working_dir);
1851  if (-1 == retval)
1852  err(EXIT_FAILURE, "chdir");
1853  }
1854 
1855  retval = execve(filename, argv, envp);
1856  if (-1 == retval)
1857  err(EXIT_FAILURE, "execve");
1858 
1859  errx(EXIT_FAILURE, "execve failed without error code");
1860  }
1861  /* Parent */
1862  if (stdin_w_ptr) {
1863  retval = close(stdin_r);
1864  if (-1 == retval)
1865  return_error(-1, FBR_ESYSTEM);
1866  retval = fbr_fd_nonblock(FBR_A_ stdin_w);
1867  if (retval)
1868  return retval;
1869  }
1870  if (stdout_r_ptr) {
1871  retval = close(stdout_w);
1872  if (-1 == retval)
1873  return_error(-1, FBR_ESYSTEM);
1874  retval = fbr_fd_nonblock(FBR_A_ stdout_r);
1875  if (retval)
1876  return retval;
1877  }
1878  if (stderr_r_ptr) {
1879  retval = close(stderr_w);
1880  if (-1 == retval)
1881  return_error(-1, FBR_ESYSTEM);
1882  retval = fbr_fd_nonblock(FBR_A_ stderr_r);
1883  if (retval)
1884  return retval;
1885  }
1886 
1887  fbr_log_d(FBR_A_ "child pid %d has been launched", pid);
1888  *stdin_w_ptr = stdin_w;
1889  *stdout_r_ptr = stdout_r;
1890  *stderr_r_ptr = stderr_r;
1891  return pid;
1892 }
1893 
1894 static void watcher_child_dtor(_unused_ FBR_P_ void *_arg)
1895 {
1896  struct ev_child *w = _arg;
1897  ev_child_stop(fctx->__p->loop, w);
1898 }
1899 
1900 int fbr_waitpid(FBR_P_ pid_t pid)
1901 {
1902  struct ev_child child;
1903  struct fbr_ev_watcher watcher;
1904  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER;
1905  ev_child_init(&child, NULL, pid, 0.);
1906  ev_child_start(fctx->__p->loop, &child);
1907  dtor.func = watcher_child_dtor;
1908  dtor.arg = &child;
1909  fbr_destructor_add(FBR_A_ &dtor);
1910 
1911  fbr_ev_watcher_init(FBR_A_ &watcher, (ev_watcher *)&child);
1912  fbr_ev_wait_one(FBR_A_ &watcher.ev_base);
1913 
1914  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */);
1915  ev_child_stop(fctx->__p->loop, &child);
1916  return_success(child.rstatus);
1917 }
1918 
1919 int fbr_system(FBR_P_ const char *filename, char *const argv[],
1920  char *const envp[], const char *working_dir)
1921 {
1922  pid_t pid;
1923  int retval;
1924 
1925  pid = fork();
1926  if (-1 == pid)
1927  return_error(-1, FBR_ESYSTEM);
1928  if (0 == pid) {
1929  /* Child */
1930  ev_break(EV_DEFAULT, EVBREAK_ALL);
1931 
1932  if (working_dir) {
1933  retval = chdir(working_dir);
1934  if (-1 == retval)
1935  err(EXIT_FAILURE, "chdir");
1936  }
1937 
1938  retval = execve(filename, argv, envp);
1939  if (-1 == retval)
1940  err(EXIT_FAILURE, "execve");
1941 
1942  errx(EXIT_FAILURE, "execve failed without error code");
1943  }
1944  /* Parent */
1945 
1946  fbr_log_d(FBR_A_ "child pid %d has been launched", pid);
1947  return fbr_waitpid(FBR_A_ pid);
1948 }
1949 
1950 #ifdef FBR_EIO_ENABLED
1951 
1952 static struct ev_loop *eio_loop;
1953 static ev_idle repeat_watcher;
1954 static ev_async ready_watcher;
1955 
1956 /* idle watcher callback, only used when eio_poll */
1957 /* didn't handle all results in one call */
1958 static void repeat(EV_P_ ev_idle *w, _unused_ int revents)
1959 {
1960  if (eio_poll () != -1)
1961  ev_idle_stop(EV_A_ w);
1962 }
1963 
1964 /* eio has some results, process them */
1965 static void ready(EV_P_ _unused_ ev_async *w, _unused_ int revents)
1966 {
1967  if (eio_poll() == -1)
1968  ev_idle_start(EV_A_ &repeat_watcher);
1969 }
1970 
1971 /* wake up the event loop */
1972 static void want_poll()
1973 {
1974  ev_async_send(eio_loop, &ready_watcher);
1975 }
1976 
1977 void fbr_eio_init()
1978 {
1979  if (NULL != eio_loop) {
1980  fprintf(stderr, "libevfibers: fbr_eio_init called twice");
1981  abort();
1982  }
1983  eio_loop = EV_DEFAULT;
1984  ev_idle_init(&repeat_watcher, repeat);
1985  ev_async_init(&ready_watcher, ready);
1986  ev_async_start(eio_loop, &ready_watcher);
1987  ev_unref(eio_loop);
1988  eio_init(want_poll, 0);
1989 }
1990 
1991 void fbr_ev_eio_init(FBR_P_ struct fbr_ev_eio *ev, eio_req *req)
1992 {
1993  ev_base_init(FBR_A_ &ev->ev_base, FBR_EV_EIO);
1994  ev->req = req;
1995 }
1996 
1997 static void eio_req_dtor(_unused_ FBR_P_ void *_arg)
1998 {
1999  eio_req *req = _arg;
2000  eio_cancel(req);
2001 }
2002 
2003 static int fiber_eio_cb(eio_req *req)
2004 {
2005  struct fbr_fiber *fiber;
2006  struct fbr_ev_eio *ev = req->data;
2007  struct fbr_context *fctx = ev->ev_base.fctx;
2008  int retval;
2009 
2010  ENSURE_ROOT_FIBER;
2011 
2012  ev_unref(eio_loop);
2013  if (EIO_CANCELLED(req))
2014  return 0;
2015 
2016  retval = fbr_id_unpack(FBR_A_ &fiber, ev->ev_base.id);
2017  if (-1 == retval) {
2018  fbr_log_e(FBR_A_ "libevfibers: fiber is about to be called by"
2019  " the eio callback, but it's id is not valid: %s",
2020  fbr_strerror(FBR_A_ fctx->f_errno));
2021  abort();
2022  }
2023 
2024  post_ev(FBR_A_ fiber, &ev->ev_base);
2025 
2026  retval = fbr_transfer(FBR_A_ fbr_id_pack(fiber));
2027  assert(0 == retval);
2028  return 0;
2029 }
2030 
2031 #define FBR_EIO_PREP \
2032  eio_req *req; \
2033  struct fbr_ev_eio e_eio; \
2034  int retval; \
2035  struct fbr_destructor dtor = FBR_DESTRUCTOR_INITIALIZER; \
2036  ev_ref(eio_loop);
2037 
2038 #define FBR_EIO_WAIT \
2039  if (NULL == req) { \
2040  ev_unref(eio_loop); \
2041  return_error(-1, FBR_EEIO); \
2042  } \
2043  dtor.func = eio_req_dtor; \
2044  dtor.arg = req; \
2045  fbr_destructor_add(FBR_A_ &dtor); \
2046  fbr_ev_eio_init(FBR_A_ &e_eio, req); \
2047  retval = fbr_ev_wait_one(FBR_A_ &e_eio.ev_base); \
2048  fbr_destructor_remove(FBR_A_ &dtor, 0 /* Call it? */); \
2049  if (retval) \
2050  return retval;
2051 
2052 #define FBR_EIO_RESULT_CHECK \
2053  if (0 > req->result) { \
2054  errno = req->errorno; \
2055  return_error(-1, FBR_ESYSTEM); \
2056  }
2057 
2058 #define FBR_EIO_RESULT_RET \
2059  FBR_EIO_RESULT_CHECK \
2060  return req->result;
2061 
2062 int fbr_eio_open(FBR_P_ const char *path, int flags, mode_t mode, int pri)
2063 {
2064  FBR_EIO_PREP;
2065  req = eio_open(path, flags, mode, pri, fiber_eio_cb, &e_eio);
2066  FBR_EIO_WAIT;
2067  FBR_EIO_RESULT_RET;
2068 }
2069 
2070 int fbr_eio_truncate(FBR_P_ const char *path, off_t offset, int pri)
2071 {
2072  FBR_EIO_PREP;
2073  req = eio_truncate(path, offset, pri, fiber_eio_cb, &e_eio);
2074  FBR_EIO_WAIT;
2075  FBR_EIO_RESULT_RET;
2076 }
2077 
2078 int fbr_eio_chown(FBR_P_ const char *path, uid_t uid, gid_t gid, int pri)
2079 {
2080  FBR_EIO_PREP;
2081  req = eio_chown(path, uid, gid, pri, fiber_eio_cb, &e_eio);
2082  FBR_EIO_WAIT;
2083  FBR_EIO_RESULT_RET;
2084 }
2085 
2086 int fbr_eio_chmod(FBR_P_ const char *path, mode_t mode, int pri)
2087 {
2088  FBR_EIO_PREP;
2089  req = eio_chmod(path, mode, pri, fiber_eio_cb, &e_eio);
2090  FBR_EIO_WAIT;
2091  FBR_EIO_RESULT_RET;
2092 }
2093 
2094 int fbr_eio_mkdir(FBR_P_ const char *path, mode_t mode, int pri)
2095 {
2096  FBR_EIO_PREP;
2097  req = eio_mkdir(path, mode, pri, fiber_eio_cb, &e_eio);
2098  FBR_EIO_WAIT;
2099  FBR_EIO_RESULT_RET;
2100 }
2101 
2102 int fbr_eio_rmdir(FBR_P_ const char *path, int pri)
2103 {
2104  FBR_EIO_PREP;
2105  req = eio_rmdir(path, pri, fiber_eio_cb, &e_eio);
2106  FBR_EIO_WAIT;
2107  FBR_EIO_RESULT_RET;
2108 }
2109 
2110 int fbr_eio_unlink(FBR_P_ const char *path, int pri)
2111 {
2112  FBR_EIO_PREP;
2113  req = eio_unlink(path, pri, fiber_eio_cb, &e_eio);
2114  FBR_EIO_WAIT;
2115  FBR_EIO_RESULT_RET;
2116 }
2117 
2118 int fbr_eio_utime(FBR_P_ const char *path, eio_tstamp atime, eio_tstamp mtime,
2119  int pri)
2120 {
2121  FBR_EIO_PREP;
2122  req = eio_utime(path, atime, mtime, pri, fiber_eio_cb, &e_eio);
2123  FBR_EIO_WAIT;
2124  FBR_EIO_RESULT_RET;
2125 }
2126 
2127 int fbr_eio_mknod(FBR_P_ const char *path, mode_t mode, dev_t dev, int pri)
2128 {
2129  FBR_EIO_PREP;
2130  req = eio_mknod(path, mode, dev, pri, fiber_eio_cb, &e_eio);
2131  FBR_EIO_WAIT;
2132  FBR_EIO_RESULT_RET;
2133 }
2134 
2135 int fbr_eio_link(FBR_P_ const char *path, const char *new_path, int pri)
2136 {
2137  FBR_EIO_PREP;
2138  req = eio_link(path, new_path, pri, fiber_eio_cb, &e_eio);
2139  FBR_EIO_WAIT;
2140  FBR_EIO_RESULT_RET;
2141 }
2142 
2143 int fbr_eio_symlink(FBR_P_ const char *path, const char *new_path, int pri)
2144 {
2145  FBR_EIO_PREP;
2146  req = eio_symlink(path, new_path, pri, fiber_eio_cb, &e_eio);
2147  FBR_EIO_WAIT;
2148  FBR_EIO_RESULT_RET;
2149 }
2150 
2151 int fbr_eio_rename(FBR_P_ const char *path, const char *new_path, int pri)
2152 {
2153  FBR_EIO_PREP;
2154  req = eio_rename(path, new_path, pri, fiber_eio_cb, &e_eio);
2155  FBR_EIO_WAIT;
2156  FBR_EIO_RESULT_RET;
2157 }
2158 
2159 int fbr_eio_mlock(FBR_P_ void *addr, size_t length, int pri)
2160 {
2161  FBR_EIO_PREP;
2162  req = eio_mlock(addr, length, pri, fiber_eio_cb, &e_eio);
2163  FBR_EIO_WAIT;
2164  FBR_EIO_RESULT_RET;
2165 }
2166 
2167 int fbr_eio_close(FBR_P_ int fd, int pri)
2168 {
2169  FBR_EIO_PREP;
2170  req = eio_close(fd, pri, fiber_eio_cb, &e_eio);
2171  FBR_EIO_WAIT;
2172  FBR_EIO_RESULT_RET;
2173 }
2174 
2175 int fbr_eio_sync(FBR_P_ int pri)
2176 {
2177  FBR_EIO_PREP;
2178  req = eio_sync(pri, fiber_eio_cb, &e_eio);
2179  FBR_EIO_WAIT;
2180  FBR_EIO_RESULT_RET;
2181 }
2182 
2183 int fbr_eio_fsync(FBR_P_ int fd, int pri)
2184 {
2185  FBR_EIO_PREP;
2186  req = eio_fsync(fd, pri, fiber_eio_cb, &e_eio);
2187  FBR_EIO_WAIT;
2188  FBR_EIO_RESULT_RET;
2189 }
2190 
2191 int fbr_eio_fdatasync(FBR_P_ int fd, int pri)
2192 {
2193  FBR_EIO_PREP;
2194  req = eio_fdatasync(fd, pri, fiber_eio_cb, &e_eio);
2195  FBR_EIO_WAIT;
2196  FBR_EIO_RESULT_RET;
2197 }
2198 
2199 int fbr_eio_futime(FBR_P_ int fd, eio_tstamp atime, eio_tstamp mtime, int pri)
2200 {
2201  FBR_EIO_PREP;
2202  req = eio_futime(fd, atime, mtime, pri, fiber_eio_cb, &e_eio);
2203  FBR_EIO_WAIT;
2204  FBR_EIO_RESULT_RET;
2205 }
2206 
2207 int fbr_eio_ftruncate(FBR_P_ int fd, off_t offset, int pri)
2208 {
2209  FBR_EIO_PREP;
2210  req = eio_ftruncate(fd, offset, pri, fiber_eio_cb, &e_eio);
2211  FBR_EIO_WAIT;
2212  FBR_EIO_RESULT_RET;
2213 }
2214 
2215 int fbr_eio_fchmod(FBR_P_ int fd, mode_t mode, int pri)
2216 {
2217  FBR_EIO_PREP;
2218  req = eio_fchmod(fd, mode, pri, fiber_eio_cb, &e_eio);
2219  FBR_EIO_WAIT;
2220  FBR_EIO_RESULT_RET;
2221 }
2222 
2223 int fbr_eio_fchown(FBR_P_ int fd, uid_t uid, gid_t gid, int pri)
2224 {
2225  FBR_EIO_PREP;
2226  req = eio_fchown(fd, uid, gid, pri, fiber_eio_cb, &e_eio);
2227  FBR_EIO_WAIT;
2228  FBR_EIO_RESULT_RET;
2229 }
2230 
2231 int fbr_eio_dup2(FBR_P_ int fd, int fd2, int pri)
2232 {
2233  FBR_EIO_PREP;
2234  req = eio_dup2(fd, fd2, pri, fiber_eio_cb, &e_eio);
2235  FBR_EIO_WAIT;
2236  FBR_EIO_RESULT_RET;
2237 }
2238 
2239 ssize_t fbr_eio_seek(FBR_P_ int fd, off_t offset, int whence, int pri)
2240 {
2241  FBR_EIO_PREP;
2242  req = eio_seek(fd, offset, whence, pri, fiber_eio_cb, &e_eio);
2243  FBR_EIO_WAIT;
2244  FBR_EIO_RESULT_CHECK;
2245  return req->offs;
2246 }
2247 
2248 ssize_t fbr_eio_read(FBR_P_ int fd, void *buf, size_t length, off_t offset,
2249  int pri)
2250 {
2251  FBR_EIO_PREP;
2252  req = eio_read(fd, buf, length, offset, pri, fiber_eio_cb, &e_eio);
2253  FBR_EIO_WAIT;
2254  FBR_EIO_RESULT_RET;
2255 }
2256 
2257 ssize_t fbr_eio_write(FBR_P_ int fd, void *buf, size_t length, off_t offset,
2258  int pri)
2259 {
2260  FBR_EIO_PREP;
2261  req = eio_write(fd, buf, length, offset, pri, fiber_eio_cb, &e_eio);
2262  FBR_EIO_WAIT;
2263  FBR_EIO_RESULT_RET;
2264 }
2265 
2266 int fbr_eio_mlockall(FBR_P_ int flags, int pri)
2267 {
2268  FBR_EIO_PREP;
2269  req = eio_mlockall(flags, pri, fiber_eio_cb, &e_eio);
2270  FBR_EIO_WAIT;
2271  FBR_EIO_RESULT_RET;
2272 }
2273 
2274 int fbr_eio_msync(FBR_P_ void *addr, size_t length, int flags, int pri)
2275 {
2276  FBR_EIO_PREP;
2277  req = eio_msync(addr, length, flags, pri, fiber_eio_cb, &e_eio);
2278  FBR_EIO_WAIT;
2279  FBR_EIO_RESULT_RET;
2280 }
2281 
2282 int fbr_eio_readlink(FBR_P_ const char *path, char *buf, size_t size, int pri)
2283 {
2284  FBR_EIO_PREP;
2285  req = eio_readlink(path, pri, fiber_eio_cb, &e_eio);
2286  FBR_EIO_WAIT;
2287  FBR_EIO_RESULT_CHECK;
2288  strncpy(buf, req->ptr2, min(size, (size_t)req->result));
2289  return req->result;
2290 }
2291 
2292 int fbr_eio_realpath(FBR_P_ const char *path, char *buf, size_t size, int pri)
2293 {
2294  FBR_EIO_PREP;
2295  req = eio_realpath(path, pri, fiber_eio_cb, &e_eio);
2296  FBR_EIO_WAIT;
2297  FBR_EIO_RESULT_CHECK;
2298  strncpy(buf, req->ptr2, min(size, (size_t)req->result));
2299  return req->result;
2300 }
2301 
2302 int fbr_eio_stat(FBR_P_ const char *path, EIO_STRUCT_STAT *statdata, int pri)
2303 {
2304  EIO_STRUCT_STAT *st;
2305  FBR_EIO_PREP;
2306  req = eio_stat(path, pri, fiber_eio_cb, &e_eio);
2307  FBR_EIO_WAIT;
2308  FBR_EIO_RESULT_CHECK;
2309  st = (EIO_STRUCT_STAT *)req->ptr2;
2310  memcpy(statdata, st, sizeof(*st));
2311  return req->result;
2312 }
2313 
2314 int fbr_eio_lstat(FBR_P_ const char *path, EIO_STRUCT_STAT *statdata, int pri)
2315 {
2316  EIO_STRUCT_STAT *st;
2317  FBR_EIO_PREP;
2318  req = eio_lstat(path, pri, fiber_eio_cb, &e_eio);
2319  FBR_EIO_WAIT;
2320  FBR_EIO_RESULT_CHECK;
2321  st = (EIO_STRUCT_STAT *)req->ptr2;
2322  memcpy(statdata, st, sizeof(*st));
2323  return req->result;
2324 }
2325 
2326 int fbr_eio_fstat(FBR_P_ int fd, EIO_STRUCT_STAT *statdata, int pri)
2327 {
2328  EIO_STRUCT_STAT *st;
2329  FBR_EIO_PREP;
2330  req = eio_fstat(fd, pri, fiber_eio_cb, &e_eio);
2331  FBR_EIO_WAIT;
2332  FBR_EIO_RESULT_CHECK;
2333  st = (EIO_STRUCT_STAT *)req->ptr2;
2334  memcpy(statdata, st, sizeof(*st));
2335  return req->result;
2336 }
2337 
2338 int fbr_eio_statvfs(FBR_P_ const char *path, EIO_STRUCT_STATVFS *statdata,
2339  int pri)
2340 {
2341  EIO_STRUCT_STATVFS *st;
2342  FBR_EIO_PREP;
2343  req = eio_statvfs(path, pri, fiber_eio_cb, &e_eio);
2344  FBR_EIO_WAIT;
2345  FBR_EIO_RESULT_CHECK;
2346  st = (EIO_STRUCT_STATVFS *)req->ptr2;
2347  memcpy(statdata, st, sizeof(*st));
2348  return req->result;
2349 }
2350 
2351 int fbr_eio_fstatvfs(FBR_P_ int fd, EIO_STRUCT_STATVFS *statdata, int pri)
2352 {
2353  EIO_STRUCT_STATVFS *st;
2354  FBR_EIO_PREP;
2355  req = eio_fstatvfs(fd, pri, fiber_eio_cb, &e_eio);
2356  FBR_EIO_WAIT;
2357  FBR_EIO_RESULT_CHECK;
2358  st = (EIO_STRUCT_STATVFS *)req->ptr2;
2359  memcpy(statdata, st, sizeof(*st));
2360  return req->result;
2361 }
2362 
2363 int fbr_eio_sendfile(FBR_P_ int out_fd, int in_fd, off_t in_offset,
2364  size_t length, int pri)
2365 {
2366  FBR_EIO_PREP;
2367  req = eio_sendfile(out_fd, in_fd, in_offset, length, pri, fiber_eio_cb,
2368  &e_eio);
2369  FBR_EIO_WAIT;
2370  FBR_EIO_RESULT_RET;
2371 }
2372 
2373 int fbr_eio_readahead(FBR_P_ int fd, off_t offset, size_t length, int pri)
2374 {
2375  FBR_EIO_PREP;
2376  req = eio_readahead(fd, offset, length, pri, fiber_eio_cb, &e_eio);
2377  FBR_EIO_WAIT;
2378  FBR_EIO_RESULT_RET;
2379 }
2380 
2381 int fbr_eio_syncfs(FBR_P_ int fd, int pri)
2382 {
2383  FBR_EIO_PREP;
2384  req = eio_syncfs(fd, pri, fiber_eio_cb, &e_eio);
2385  FBR_EIO_WAIT;
2386  FBR_EIO_RESULT_RET;
2387 }
2388 
2389 int fbr_eio_sync_file_range(FBR_P_ int fd, off_t offset, size_t nbytes,
2390  unsigned int flags, int pri)
2391 {
2392  FBR_EIO_PREP;
2393  req = eio_sync_file_range(fd, offset, nbytes, flags, pri, fiber_eio_cb,
2394  &e_eio);
2395  FBR_EIO_WAIT;
2396  FBR_EIO_RESULT_RET;
2397 }
2398 
2399 int fbr_eio_fallocate(FBR_P_ int fd, int mode, off_t offset, off_t len, int pri)
2400 {
2401  FBR_EIO_PREP;
2402  req = eio_fallocate(fd, mode, offset, len, pri, fiber_eio_cb, &e_eio);
2403  FBR_EIO_WAIT;
2404  FBR_EIO_RESULT_RET;
2405 }
2406 
2407 static void custom_execute_cb(eio_req *req)
2408 {
2409  struct fbr_ev_eio *ev = req->data;
2410  req->result = ev->custom_func(ev->custom_arg);
2411 }
2412 
2413 eio_ssize_t fbr_eio_custom(FBR_P_ fbr_eio_custom_func_t func, void *data,
2414  int pri)
2415 {
2416  FBR_EIO_PREP;
2417  e_eio.custom_func = func;
2418  e_eio.custom_arg = data;
2419  req = eio_custom(custom_execute_cb, pri, fiber_eio_cb, &e_eio);
2420  FBR_EIO_WAIT;
2421  FBR_EIO_RESULT_RET;
2422 }
2423 
2424 #else
2425 
2426 void fbr_eio_init(FBR_PU)
2427 {
2428  fbr_log_e(FBR_A_ "libevfibers: libeio support is not compiled");
2429  abort();
2430 }
2431 
2432 #endif