#define _GNU_SOURCE #include "ytdlsb-tasks.h" #include #include #include #include #include #include #include #include static void get_now(struct timespec *t){ CKAP(clock_gettime(CLOCK_MONOTONIC, t)); } static void set_time_ms(struct timespec *dst, int ms){ dst->tv_sec = ms / 1000; dst->tv_nsec = ms % 1000 * 1000 * 1000; } static int get_time_ms(struct timespec *src){ if((INT_MAX - 999) / 1000 < src->tv_sec){ return (INT_MAX - 999) / 1000 * 1000; // lazy saturation } return (int)src->tv_sec * 1000 + src->tv_nsec / 1000 / 1000; } static void copy_time(struct timespec *dst, struct timespec *src){ memcpy(dst, src, sizeof(*dst)); } static void add_time(struct timespec *dst, struct timespec *src){ dst->tv_sec += src->tv_sec; dst->tv_nsec -= 1000 * 1000 * 1000; dst->tv_nsec += src->tv_nsec; if(dst->tv_nsec < 0) dst->tv_nsec += 1000 * 1000 * 1000; else dst->tv_sec += 1; } static void sub_time(struct timespec *dst, struct timespec *src){ dst->tv_sec -= src->tv_sec; dst->tv_nsec -= src->tv_nsec; if(src->tv_nsec < 0){ dst->tv_sec -= 1; dst->tv_nsec += 1000 * 1000 * 1000; } } static int lt_time(struct timespec *lt, struct timespec *gt){ return lt->tv_sec < gt->tv_sec || lt->tv_sec == gt->tv_sec && lt->tv_nsec < gt->tv_nsec; } int ytdlsb_task_timeout_ms(struct ytdlsb_task *task, int ms){ struct timespec a; set_time_ms(&a, ms); get_now(&task->timeout); add_time(&task->timeout, &a); return 0; } int ytdlsb_task_timeout_ms_min(struct ytdlsb_task *task, int ms){ struct timespec a, now; get_now(&now); set_time_ms(&a, ms); add_time(&a, &now); if(get_time_ms(&task->timeout) == 0 || lt_time(&a, &task->timeout)){ copy_time(&task->timeout, &a); } return 0; } int ytdlsb_task_timeout_unset(struct ytdlsb_task *task){ set_time_ms(&task->timeout, 0); return 0; } int ytdlsb_tasks_step(struct ytdlsb_tasks *tasks){ int ret = -1; struct pollfd *fds = NULL; size_t *fdso = NULL; struct timespec now; size_t last_fd = 0; get_now(&now); fdso = CKAR(reallocarray(NULL, tasks->task_num, sizeof(*fdso))); int timeout = -1; nfds_t nfds = 0; for(size_t i = 0; i < tasks->task_num; i++){ struct ytdlsb_task *t = &tasks->tasks[i]; if(!t->process){ fdso[i] = nfds; continue; } if(get_time_ms(&t->timeout) == 0){ // do nothing }else if(lt_time(&t->timeout, &now)){ timeout = 0; }else{ struct timespec tdiff; copy_time(&tdiff, &t->timeout); sub_time(&tdiff, &now); int ms = get_time_ms(&tdiff); if(timeout == -1 || timeout < ms) timeout = ms; } fdso[i] = nfds + t->pollfd_num; if(t->pollfd_num){ nfds += t->pollfd_num; fds = CKAR(reallocarray(fds, nfds, sizeof(*fds))); for(size_t j = 0; j < t->pollfd_num; j++){ struct pollfd *dst = &fds[nfds - t->pollfd_num + j]; dst->fd = t->pollfd[j].fd; dst->events = t->pollfd[j].events; } } } CKP(err, poll(fds, nfds, timeout)); get_now(&now); for(size_t i = 0; i < tasks->task_num; i++){ struct ytdlsb_task *t = &tasks->tasks[i]; int want_call = 0; if(get_time_ms(&t->timeout) && lt_time(&t->timeout, &now)) want_call |= 1; for(size_t j = last_fd; !want_call && j < fdso[i]; j++){ short er = fds[j].events; short re = fds[j].revents; CK(err, re & POLLNVAL, == 0); want_call |= re&POLLHUP || re&POLLERR; want_call |= er&POLLIN && re&POLLIN; want_call |= er&POLLOUT && re&POLLOUT; want_call |= er&POLLPRI && re&POLLPRI; } if(t->process && want_call){ CKP(err, t->process(t)); } last_fd = fdso[i]; } ret = 0; err: free(fds); free(fdso); return ret; } struct ytdlsb_task_event{ int pipe[2]; int (*cb)(void *); void *data; }; static int ytdlsb_task_event_cb(struct ytdlsb_task *t){ struct ytdlsb_task_event *e = t->data; ssize_t rs; while(1){ int buf; rs = read(e->pipe[0], &buf, sizeof(buf)); if(rs == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) break; CK_MSG(err, rs, > 0, "pipe read error"); } return e->cb(e->data) < 0 ? -1 : 0; err: return -1; } int ytdlsb_task_event_init( struct ytdlsb_task *task, int (*cb)(void *), void *data ){ int ret = -1; struct ytdlsb_task_event *e; e = CKAR(malloc(sizeof(*e))); CKP(err, pipe2(e->pipe, O_CLOEXEC|O_NONBLOCK)); e->cb = cb; e->data = data; task->process = ytdlsb_task_event_cb; task->pollfd = CKAR(malloc(sizeof(*task->pollfd))); task->pollfd[0].fd = e->pipe[0]; task->pollfd[0].events = YTDLSB_POLLIN; task->pollfd_num = 1; task->data = e; e = NULL; ret = 0; err: free(e); return ret; } void ytdlsb_task_event_destroy(struct ytdlsb_task *task){ struct ytdlsb_task_event *e = task->data; CKAP(close(e->pipe[0])); CKAP(close(e->pipe[1])); free(e); free(task->pollfd); } int ytdlsb_task_event_wake(struct ytdlsb_task *task){ struct ytdlsb_task_event *e = task->data; ssize_t ws = write(e->pipe[1], "\n", 1); if(ws == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) return 0; CK_MSG(err, ws, > 0, "pipe write error"); return 0; err: return -1; } int ytdlsb_task_fdto_from_curl( struct ytdlsb_task *task, size_t off, int overwrite_timeout, CURLM* cm ){ int ret = -1; unsigned int ncfds = 0; struct curl_waitfd *cfds = NULL; long timeout; CKZ(err, curl_multi_timeout(cm, &timeout)); if(overwrite_timeout && timeout < 0){ CKP(err, ytdlsb_task_timeout_unset(task)); }else if(overwrite_timeout){ CKP(err, ytdlsb_task_timeout_ms(task, TRY_NUMCAST(err, int, timeout))); }else{ CKP(err, ytdlsb_task_timeout_ms_min(task, TRY_NUMCAST(err, int, timeout))); } CKZ(err, curl_multi_waitfds(cm, NULL, 0, &ncfds)); if(ncfds == 0){ task->pollfd_num = off; return 0; } cfds = CKAR(reallocarray(NULL, ncfds, sizeof(*cfds))); CKZ(err, curl_multi_waitfds(cm, cfds, ncfds, NULL)); task->pollfd = CKAR(reallocarray(task->pollfd, off+ncfds, sizeof(*task->pollfd))); for(size_t i = 0; i < ncfds; i++){ task->pollfd[off+i].fd = cfds[i].fd; task->pollfd[off+i].events = 0; #define PT(t) if(cfds[i].fd & CURL_WAIT_POLL ## t) \ task->pollfd[off+i].events |= YTDLSB_POLL ## t PT(IN); PT(PRI); PT(OUT); #undef PT } task->pollfd_num = off + ncfds; ret = ncfds; err: free(cfds); return ret; }