diff --git a/core/master_utils.c b/core/master_utils.c index 5eece040..bbdfb046 100644 --- a/core/master_utils.c +++ b/core/master_utils.c @@ -852,8 +852,10 @@ void uwsgi_send_stats(int fd) { goto end0; if (uwsgi_stats_keylong_comma(us, "signals", (unsigned long long) uwsgi.workers[i + 1].signals)) goto end0; +/* if (uwsgi_stats_keylong_comma(us, "static_offload_threads", (unsigned long long) uwsgi.workers[i + 1].static_offload_threads)) goto end0; +*/ if (uwsgi.workers[i + 1].cheaped) { if (uwsgi_stats_keyval_comma(us, "status", "cheap")) diff --git a/core/protocol.c b/core/protocol.c index d92fd375..26c38241 100644 --- a/core/protocol.c +++ b/core/protocol.c @@ -1908,42 +1908,23 @@ int uwsgi_real_file_serve(struct wsgi_request *wsgi_req, char *real_filename, si headers_vec[3].iov_len = 48; wsgi_req->headers_size += wsgi_req->socket->proto_writev_header(wsgi_req, headers_vec, 4); wsgi_req->header_cnt += 2; + + // Ok, the file must be transferred from uWSGI + if (!wsgi_req->socket->can_offload) { + if (!uwsgi_offload_request_do(wsgi_req, real_filename, st->st_size)) goto done; + } + wsgi_req->sendfile_fd = open(real_filename, O_RDONLY); wsgi_req->response_size += uwsgi_sendfile(wsgi_req); // here we need to close the sendfile fd (no-GC involved) close(wsgi_req->sendfile_fd); } - +done: wsgi_req->status = 200; return 0; } -void *uwsgi_static_offload_thread(void *req) { - - struct uwsgi_offload_request *uof_req = (struct uwsgi_offload_request *) req; - - uwsgi_real_file_serve(&uof_req->wsgi_req, uof_req->real_filename, uof_req->real_filename_len, &uof_req->st); - - // close the connection with the webserver - if (!uof_req->wsgi_req.fd_closed || !uof_req->wsgi_req.body_as_file) { - // NOTE, if we close the socket before receiving eventually sent data, socket layer will send a RST - uof_req->wsgi_req.socket->proto_close(&uof_req->wsgi_req); - } - - // free buffer - free(uof_req->buffer); - // free hvec - free(uof_req->hvec); - free(uof_req); - - pthread_mutex_lock(&uwsgi.static_offload_thread_lock); - uwsgi.workers[uwsgi.mywid].static_offload_threads--; - pthread_mutex_unlock(&uwsgi.static_offload_thread_lock); - - return NULL; -} - int uwsgi_file_serve(struct wsgi_request *wsgi_req, char *document_root, uint16_t document_root_len, char *path_info, uint16_t path_info_len, int is_a_file) { @@ -1996,56 +1977,7 @@ int uwsgi_file_serve(struct wsgi_request *wsgi_req, char *document_root, uint16_ sse = sse->next; } - // Ok, the file must be served as static from uWSGI - if (uwsgi.static_offload_to_thread) { - pthread_mutex_lock(&uwsgi.static_offload_thread_lock); - uint64_t offload_thread_count = uwsgi.workers[uwsgi.mywid].static_offload_threads; - pthread_mutex_unlock(&uwsgi.static_offload_thread_lock); - - if (offload_thread_count > (uint64_t) uwsgi.static_offload_to_thread) { - uwsgi_log_verbose("OVERLOAD !!! unable to offload static file serving\n"); - return uwsgi_real_file_serve(wsgi_req, real_filename, real_filename_len, &st); - } - - struct uwsgi_offload_request *uor = uwsgi_malloc(sizeof(struct uwsgi_offload_request)); - - // buffer - uor->buffer = uwsgi_malloc(uwsgi.buffer_size); memcpy(uor->buffer, wsgi_req->buffer, uwsgi.buffer_size); - // iovec - uor->hvec = uwsgi_malloc(sizeof(struct iovec) * uwsgi.vec_size); memcpy(uor->hvec, wsgi_req->hvec, sizeof(struct iovec) * uwsgi.vec_size); - - // wsgi_req - memcpy(&uor->wsgi_req, wsgi_req, sizeof(struct wsgi_request)); - - uor->wsgi_req.buffer = uor->buffer; - uor->wsgi_req.hvec = uor->hvec; - - // stat - memcpy(&uor->st, &st, sizeof(struct stat)); - - // filename - memcpy(uor->real_filename, real_filename, real_filename_len); - uor->real_filename_len = real_filename_len; - uor->real_filename[uor->real_filename_len] = 0; - - // avoid closing the connection - wsgi_req->fd_closed = 1; - - pthread_mutex_lock(&uwsgi.static_offload_thread_lock); - uwsgi.workers[uwsgi.mywid].static_offload_threads++; - pthread_mutex_unlock(&uwsgi.static_offload_thread_lock); - - if (pthread_create(&uor->tid, &uwsgi.static_offload_thread_attr, uwsgi_static_offload_thread, (void *) uor)) { - uwsgi_error("pthread_create()"); - // bad condition, better to exit... - exit(1); - } - wsgi_req->status = -30; - return 0; - } - else { - return uwsgi_real_file_serve(wsgi_req, real_filename, real_filename_len, &st); - } + return uwsgi_real_file_serve(wsgi_req, real_filename, real_filename_len, &st); } return -1; diff --git a/core/sendfile.c b/core/sendfile.c index 5aa55b26..1c950581 100644 --- a/core/sendfile.c +++ b/core/sendfile.c @@ -1,9 +1,60 @@ -#ifdef UWSGI_SENDFILE - #include "uwsgi.h" extern struct uwsgi_server uwsgi; +/* + +enqueue a file transfer to the offload thread + +*/ + +int uwsgi_offload_request_do(struct wsgi_request *wsgi_req, char *filename, size_t len) { + + // avoid closing the connection + wsgi_req->fd_closed = 1; + + // fill offload request + struct uwsgi_offload_request uor; + uor.fd = open(filename, O_RDONLY | O_NONBLOCK); + if (uor.fd < 0) { + uwsgi_error_open(filename); + goto error; + } + uor.s = wsgi_req->poll.fd; + uor.pos = 0; + uor.len = len; + uor.written = 0; + + if (write(uwsgi.offload_thread->pipe[0], &uor, sizeof(struct uwsgi_offload_request)) != sizeof(struct uwsgi_offload_request)) { + goto error2; + } + + return 0; + +error2: + close(uor.fd); +error: + wsgi_req->fd_closed = 0; + return -1; +} + +static void uwsgi_offload_loop(struct uwsgi_thread *ut) { + + int i; + void *events = event_queue_alloc(uwsgi.static_offload_to_thread); + + for(;;) { + int nevents = event_queue_wait_multi(ut->queue, -1, events, uwsgi.static_offload_to_thread); + for (i=0;isendfile_fd; @@ -173,5 +224,3 @@ ssize_t uwsgi_do_sendfile(int sockfd, int filefd, size_t filesize, size_t chunk, #endif } - -#endif diff --git a/core/socket.c b/core/socket.c index 7440f0a9..81515cd9 100644 --- a/core/socket.c +++ b/core/socket.c @@ -1751,6 +1751,7 @@ setup_proto: uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header; uwsgi_sock->proto_sendfile = NULL; uwsgi_sock->proto_close = uwsgi_proto_base_close; + uwsgi_sock->can_offload = 1; } else if (requested_protocol && (!strcmp("fastcgi", requested_protocol) || !strcmp("fcgi", requested_protocol))) { if (!strcmp(uwsgi.protocol, "fastcgi") || !strcmp(uwsgi.protocol, "fcgi")) { @@ -1779,6 +1780,7 @@ setup_proto: uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header; uwsgi_sock->proto_sendfile = NULL; uwsgi_sock->proto_close = uwsgi_proto_base_close; + uwsgi_sock->can_offload = 1; } nextsock: uwsgi_sock = uwsgi_sock->next; diff --git a/core/utils.c b/core/utils.c index 6f7a195f..48b42645 100644 --- a/core/utils.c +++ b/core/utils.c @@ -4881,7 +4881,12 @@ struct uwsgi_thread *uwsgi_thread_new(void (*func)(struct uwsgi_thread *)) { ut->func = func; - if (pthread_create(&ut->tid, NULL, uwsgi_thread_run, ut)) { + pthread_attr_init(&ut->tattr); + pthread_attr_setdetachstate(&ut->tattr, PTHREAD_CREATE_DETACHED); + // 512K should be enough... + pthread_attr_setstacksize(&ut->tattr, 512 * 1024); + + if (pthread_create(&ut->tid, &ut->tattr, uwsgi_thread_run, ut)) { uwsgi_error("pthread_create()"); goto error; } diff --git a/core/uwsgi.c b/core/uwsgi.c index 10119b6a..c99a6989 100644 --- a/core/uwsgi.c +++ b/core/uwsgi.c @@ -2063,15 +2063,6 @@ int uwsgi_start(void *v_argv) { } } - // prepare offload threads - if (uwsgi.static_offload_to_thread) { - pthread_attr_init(&uwsgi.static_offload_thread_attr); - pthread_attr_setdetachstate(&uwsgi.static_offload_thread_attr, PTHREAD_CREATE_DETACHED); - // 512K should be enough... - pthread_attr_setstacksize(&uwsgi.static_offload_thread_attr, 512 * 1024); - pthread_mutex_init(&uwsgi.static_offload_thread_lock, NULL); - } - if (uwsgi.requested_max_fd) { uwsgi.rl.rlim_cur = uwsgi.requested_max_fd; uwsgi.rl.rlim_max = uwsgi.requested_max_fd; diff --git a/plugins/http/http.c b/plugins/http/http.c index d10af0b1..b6f64e6f 100644 --- a/plugins/http/http.c +++ b/plugins/http/http.c @@ -965,56 +965,6 @@ ssize_t uwsgi_http_ssl_recv(struct uwsgi_corerouter *cr, struct corerouter_sessi } -ssize_t uwsgi_http_nb_send(struct uwsgi_corerouter *cr, struct corerouter_session *cs, char *buf, size_t len) { - struct http_session *hs = (struct http_session *) cs; - ssize_t ret = write(cs->fd, buf, len); - if (ret == (ssize_t) len) { - if (cs->instance_stopped) { - event_queue_add_fd_read(cr->queue, cs->instance_fd); - cs->instance_stopped = 0; - } - if (cs->fd_state) { - event_queue_fd_write_to_read(cr->queue, cs->fd); - cs->fd_state = 0; - } - return len; - } - else if (ret == 0) { - return -1; - } - else if (ret < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) { - if (cs->instance_fd != -1) { - event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read()); - cs->instance_stopped = 1; - } - if (!cs->fd_state) { - event_queue_fd_read_to_write(cr->queue, cs->fd); - cs->fd_state = 1; - } - errno = EINPROGRESS; - return -1; - } - uwsgi_error("write()"); - return -1; - } - - // partial write - hs->buffer_len -= ret; - memcpy(hs->buffer, hs->buffer + ret, hs->buffer_len); - if (cs->instance_fd != -1) { - event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read()); - cs->instance_stopped = 1; - } - if (!cs->fd_state) { - event_queue_fd_read_to_write(cr->queue, cs->fd); - cs->fd_state = 1; - } - - errno = EINPROGRESS; - return -1; -} - ssize_t uwsgi_http_ssl_send(struct uwsgi_corerouter *cr, struct corerouter_session *cs, char *buf, size_t len) { struct http_session *hs = (struct http_session *) cs; int ret = SSL_write(hs->ssl, buf, len); @@ -1096,6 +1046,58 @@ void uwsgi_ssl_close(struct uwsgi_corerouter *ucr, struct corerouter_session *cs } #endif + +ssize_t uwsgi_http_nb_send(struct uwsgi_corerouter *cr, struct corerouter_session *cs, char *buf, size_t len) { + struct http_session *hs = (struct http_session *) cs; + ssize_t ret = write(cs->fd, buf, len); + if (ret == (ssize_t) len) { + if (cs->instance_stopped) { + event_queue_add_fd_read(cr->queue, cs->instance_fd); + cs->instance_stopped = 0; + } + if (cs->fd_state) { + event_queue_fd_write_to_read(cr->queue, cs->fd); + cs->fd_state = 0; + } + return len; + } + else if (ret == 0) { + return -1; + } + else if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) { + if (cs->instance_fd != -1) { + event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read()); + cs->instance_stopped = 1; + } + if (!cs->fd_state) { + event_queue_fd_read_to_write(cr->queue, cs->fd); + cs->fd_state = 1; + } + errno = EINPROGRESS; + return -1; + } + uwsgi_error("write()"); + return -1; + } + + // partial write + hs->buffer_len -= ret; + memcpy(hs->buffer, hs->buffer + ret, hs->buffer_len); + if (cs->instance_fd != -1) { + event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read()); + cs->instance_stopped = 1; + } + if (!cs->fd_state) { + event_queue_fd_read_to_write(cr->queue, cs->fd); + cs->fd_state = 1; + } + + errno = EINPROGRESS; + return -1; +} + + void http_alloc_session(struct uwsgi_corerouter *ucr, struct uwsgi_gateway_socket *ugs, struct corerouter_session *cs, struct sockaddr *sa, socklen_t s_len) { struct http_session *hs = (struct http_session *) cs; hs->ptr = hs->buffer; diff --git a/plugins/python/wsgi_headers.c b/plugins/python/wsgi_headers.c index 8bc63cac..937197d4 100644 --- a/plugins/python/wsgi_headers.c +++ b/plugins/python/wsgi_headers.c @@ -35,6 +35,13 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) { wsgi_req->headers = NULL; } +#ifdef PYTHREE + if (wsgi_req->gc_tracker) { + Py_DECREF((PyObject *)wsgi_req->gc_tracker); + wsgi_req->gc_tracker = NULL; + } +#endif + // this must be done before headers management if (PyTuple_Size(args) > 2) { exc_info = PyTuple_GetItem(args, 2); @@ -72,6 +79,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) { return PyErr_Format(PyExc_TypeError, "http status must be a string"); } +#ifdef PYTHREE + // this list maintains reference to encoded strings.. ugly hack, i know, but it works... + wsgi_req->gc_tracker = (void *) PyList_New(0); +#endif if (uwsgi.shared->options[UWSGI_OPTION_CGI_MODE] == 0) { base = 4; @@ -89,7 +100,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) { wsgi_req->hvec[1].iov_len = 1; #ifdef PYTHREE if (self != Py_None) { - wsgi_req->hvec[2].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(head)); + PyObject *zero = PyUnicode_AsASCIIString(head); + wsgi_req->hvec[2].iov_base = PyBytes_AsString(zero); + PyList_Append((PyObject *) wsgi_req->gc_tracker, zero); + Py_DECREF(zero); } else { wsgi_req->hvec[2].iov_base = PyBytes_AsString(head); @@ -110,7 +124,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) { wsgi_req->hvec[0].iov_len = 8; #ifdef PYTHREE if (self != Py_None) { - wsgi_req->hvec[1].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(head)); + PyObject *zero = PyUnicode_AsASCIIString(head); + wsgi_req->hvec[1].iov_base = PyBytes_AsString(zero); + PyList_Append((PyObject *) wsgi_req->gc_tracker, zero); + Py_DECREF(zero); } else { wsgi_req->hvec[1].iov_base = PyBytes_AsString(head); @@ -181,7 +198,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) { #ifdef PYTHREE if (self != Py_None) { - wsgi_req->hvec[j].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(h_key)); + PyObject *zero = PyUnicode_AsASCIIString(h_key); + wsgi_req->hvec[j].iov_base = PyBytes_AsString(zero); + PyList_Append((PyObject *) wsgi_req->gc_tracker, zero); + Py_DECREF(zero); } else { wsgi_req->hvec[j].iov_base = PyBytes_AsString(h_key); @@ -195,7 +215,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) { wsgi_req->hvec[j + 1].iov_len = H_SEP_SIZE; #ifdef PYTHREE if (self != Py_None) { - wsgi_req->hvec[j + 2].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(h_value)); + PyObject *zero = PyUnicode_AsASCIIString(h_value); + wsgi_req->hvec[j + 2].iov_base = PyBytes_AsString(zero); + PyList_Append((PyObject *) wsgi_req->gc_tracker, zero); + Py_DECREF(zero); } else { wsgi_req->hvec[j + 2].iov_base = PyBytes_AsString(h_value); @@ -288,6 +311,13 @@ int uwsgi_python_do_send_headers(struct wsgi_request *wsgi_req) { wsgi_req->headers = NULL; } +#ifdef PYTHREE + if (wsgi_req->gc_tracker) { + Py_DECREF((PyObject *)wsgi_req->gc_tracker); + wsgi_req->gc_tracker = NULL; + } +#endif + if (wsgi_req->write_errors > uwsgi.write_errors_tolerance && !uwsgi.disable_write_exception) { uwsgi_py_write_set_exception(wsgi_req); return -1; diff --git a/uwsgi.h b/uwsgi.h index 860a718f..602a8da8 100644 --- a/uwsgi.h +++ b/uwsgi.h @@ -636,6 +636,8 @@ struct uwsgi_socket { int *retry; + int can_offload; + // this is a special map for having socket->thread mapping int *fd_threads; @@ -1007,6 +1009,7 @@ struct wsgi_request { int status; void *status_header; void *headers; + void *gc_tracker; size_t response_size; ssize_t headers_size; @@ -1487,8 +1490,7 @@ struct uwsgi_server { int check_static_docroot; int static_offload_to_thread; - pthread_attr_t static_offload_thread_attr; - pthread_mutex_t static_offload_thread_lock; + struct uwsgi_thread *offload_thread; char *daemonize; char *daemonize2; @@ -1851,17 +1853,6 @@ struct uwsgi_server { }; -struct uwsgi_offload_request { - pthread_t tid; - char *buffer; - struct iovec *hvec; - char real_filename[PATH_MAX+1]; - size_t real_filename_len; - struct stat st; - struct wsgi_request wsgi_req; -}; - - struct uwsgi_rpc { char name[0xff]; void *func; @@ -2092,8 +2083,6 @@ struct uwsgi_worker { uint64_t avg_response_time; - uint64_t static_offload_threads; - struct uwsgi_core *cores; char name[0xff]; @@ -3314,6 +3303,7 @@ void uwsgi_alarms_init(); struct uwsgi_thread { pthread_t tid; + pthread_attr_t tattr; int pipe[2]; int queue; ssize_t rlen; @@ -3329,6 +3319,17 @@ struct uwsgi_thread { }; struct uwsgi_thread *uwsgi_thread_new(void (*)(struct uwsgi_thread *)); +struct uwsgi_offload_request { + int s; + int fd; + off_t pos; + size_t len; + size_t written; +}; + +struct uwsgi_thread *uwsgi_offload_thread_start(void); +int uwsgi_offload_request_do(struct wsgi_request *, char *, size_t); + void uwsgi_check_emperor(void); #ifdef UWSGI_AS_SHARED_LIBRARY int uwsgi_init(int, char **, char **);