fixed leak in python3 headers management, prepare for truly working static file serving offload

This commit is contained in:
roberto@quantal64
2012-10-01 19:49:04 +02:00
parent 248741a81f
commit 3beead6459
9 changed files with 173 additions and 159 deletions

View File

@@ -852,8 +852,10 @@ void uwsgi_send_stats(int fd) {
goto end0;
if (uwsgi_stats_keylong_comma(us, "signals", (unsigned long long) uwsgi.workers[i + 1].signals))
goto end0;
/*
if (uwsgi_stats_keylong_comma(us, "static_offload_threads", (unsigned long long) uwsgi.workers[i + 1].static_offload_threads))
goto end0;
*/
if (uwsgi.workers[i + 1].cheaped) {
if (uwsgi_stats_keyval_comma(us, "status", "cheap"))

View File

@@ -1908,42 +1908,23 @@ int uwsgi_real_file_serve(struct wsgi_request *wsgi_req, char *real_filename, si
headers_vec[3].iov_len = 48;
wsgi_req->headers_size += wsgi_req->socket->proto_writev_header(wsgi_req, headers_vec, 4);
wsgi_req->header_cnt += 2;
// Ok, the file must be transferred from uWSGI
if (!wsgi_req->socket->can_offload) {
if (!uwsgi_offload_request_do(wsgi_req, real_filename, st->st_size)) goto done;
}
wsgi_req->sendfile_fd = open(real_filename, O_RDONLY);
wsgi_req->response_size += uwsgi_sendfile(wsgi_req);
// here we need to close the sendfile fd (no-GC involved)
close(wsgi_req->sendfile_fd);
}
done:
wsgi_req->status = 200;
return 0;
}
void *uwsgi_static_offload_thread(void *req) {
struct uwsgi_offload_request *uof_req = (struct uwsgi_offload_request *) req;
uwsgi_real_file_serve(&uof_req->wsgi_req, uof_req->real_filename, uof_req->real_filename_len, &uof_req->st);
// close the connection with the webserver
if (!uof_req->wsgi_req.fd_closed || !uof_req->wsgi_req.body_as_file) {
// NOTE, if we close the socket before receiving eventually sent data, socket layer will send a RST
uof_req->wsgi_req.socket->proto_close(&uof_req->wsgi_req);
}
// free buffer
free(uof_req->buffer);
// free hvec
free(uof_req->hvec);
free(uof_req);
pthread_mutex_lock(&uwsgi.static_offload_thread_lock);
uwsgi.workers[uwsgi.mywid].static_offload_threads--;
pthread_mutex_unlock(&uwsgi.static_offload_thread_lock);
return NULL;
}
int uwsgi_file_serve(struct wsgi_request *wsgi_req, char *document_root, uint16_t document_root_len, char *path_info, uint16_t path_info_len, int is_a_file) {
@@ -1996,56 +1977,7 @@ int uwsgi_file_serve(struct wsgi_request *wsgi_req, char *document_root, uint16_
sse = sse->next;
}
// Ok, the file must be served as static from uWSGI
if (uwsgi.static_offload_to_thread) {
pthread_mutex_lock(&uwsgi.static_offload_thread_lock);
uint64_t offload_thread_count = uwsgi.workers[uwsgi.mywid].static_offload_threads;
pthread_mutex_unlock(&uwsgi.static_offload_thread_lock);
if (offload_thread_count > (uint64_t) uwsgi.static_offload_to_thread) {
uwsgi_log_verbose("OVERLOAD !!! unable to offload static file serving\n");
return uwsgi_real_file_serve(wsgi_req, real_filename, real_filename_len, &st);
}
struct uwsgi_offload_request *uor = uwsgi_malloc(sizeof(struct uwsgi_offload_request));
// buffer
uor->buffer = uwsgi_malloc(uwsgi.buffer_size); memcpy(uor->buffer, wsgi_req->buffer, uwsgi.buffer_size);
// iovec
uor->hvec = uwsgi_malloc(sizeof(struct iovec) * uwsgi.vec_size); memcpy(uor->hvec, wsgi_req->hvec, sizeof(struct iovec) * uwsgi.vec_size);
// wsgi_req
memcpy(&uor->wsgi_req, wsgi_req, sizeof(struct wsgi_request));
uor->wsgi_req.buffer = uor->buffer;
uor->wsgi_req.hvec = uor->hvec;
// stat
memcpy(&uor->st, &st, sizeof(struct stat));
// filename
memcpy(uor->real_filename, real_filename, real_filename_len);
uor->real_filename_len = real_filename_len;
uor->real_filename[uor->real_filename_len] = 0;
// avoid closing the connection
wsgi_req->fd_closed = 1;
pthread_mutex_lock(&uwsgi.static_offload_thread_lock);
uwsgi.workers[uwsgi.mywid].static_offload_threads++;
pthread_mutex_unlock(&uwsgi.static_offload_thread_lock);
if (pthread_create(&uor->tid, &uwsgi.static_offload_thread_attr, uwsgi_static_offload_thread, (void *) uor)) {
uwsgi_error("pthread_create()");
// bad condition, better to exit...
exit(1);
}
wsgi_req->status = -30;
return 0;
}
else {
return uwsgi_real_file_serve(wsgi_req, real_filename, real_filename_len, &st);
}
return uwsgi_real_file_serve(wsgi_req, real_filename, real_filename_len, &st);
}
return -1;

View File

@@ -1,9 +1,60 @@
#ifdef UWSGI_SENDFILE
#include "uwsgi.h"
extern struct uwsgi_server uwsgi;
/*
enqueue a file transfer to the offload thread
*/
int uwsgi_offload_request_do(struct wsgi_request *wsgi_req, char *filename, size_t len) {
// avoid closing the connection
wsgi_req->fd_closed = 1;
// fill offload request
struct uwsgi_offload_request uor;
uor.fd = open(filename, O_RDONLY | O_NONBLOCK);
if (uor.fd < 0) {
uwsgi_error_open(filename);
goto error;
}
uor.s = wsgi_req->poll.fd;
uor.pos = 0;
uor.len = len;
uor.written = 0;
if (write(uwsgi.offload_thread->pipe[0], &uor, sizeof(struct uwsgi_offload_request)) != sizeof(struct uwsgi_offload_request)) {
goto error2;
}
return 0;
error2:
close(uor.fd);
error:
wsgi_req->fd_closed = 0;
return -1;
}
static void uwsgi_offload_loop(struct uwsgi_thread *ut) {
int i;
void *events = event_queue_alloc(uwsgi.static_offload_to_thread);
for(;;) {
int nevents = event_queue_wait_multi(ut->queue, -1, events, uwsgi.static_offload_to_thread);
for (i=0;i<nevents;i++) {
//int interesting_fd = event_queue_interesting_fd(events, i);
}
}
}
struct uwsgi_thread *uwsgi_offload_thread_start() {
return uwsgi_thread_new(uwsgi_offload_loop);
}
ssize_t uwsgi_sendfile(struct wsgi_request *wsgi_req) {
int fd = wsgi_req->sendfile_fd;
@@ -173,5 +224,3 @@ ssize_t uwsgi_do_sendfile(int sockfd, int filefd, size_t filesize, size_t chunk,
#endif
}
#endif

View File

@@ -1751,6 +1751,7 @@ setup_proto:
uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header;
uwsgi_sock->proto_sendfile = NULL;
uwsgi_sock->proto_close = uwsgi_proto_base_close;
uwsgi_sock->can_offload = 1;
}
else if (requested_protocol && (!strcmp("fastcgi", requested_protocol) || !strcmp("fcgi", requested_protocol))) {
if (!strcmp(uwsgi.protocol, "fastcgi") || !strcmp(uwsgi.protocol, "fcgi")) {
@@ -1779,6 +1780,7 @@ setup_proto:
uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header;
uwsgi_sock->proto_sendfile = NULL;
uwsgi_sock->proto_close = uwsgi_proto_base_close;
uwsgi_sock->can_offload = 1;
}
nextsock:
uwsgi_sock = uwsgi_sock->next;

View File

@@ -4881,7 +4881,12 @@ struct uwsgi_thread *uwsgi_thread_new(void (*func)(struct uwsgi_thread *)) {
ut->func = func;
if (pthread_create(&ut->tid, NULL, uwsgi_thread_run, ut)) {
pthread_attr_init(&ut->tattr);
pthread_attr_setdetachstate(&ut->tattr, PTHREAD_CREATE_DETACHED);
// 512K should be enough...
pthread_attr_setstacksize(&ut->tattr, 512 * 1024);
if (pthread_create(&ut->tid, &ut->tattr, uwsgi_thread_run, ut)) {
uwsgi_error("pthread_create()");
goto error;
}

View File

@@ -2063,15 +2063,6 @@ int uwsgi_start(void *v_argv) {
}
}
// prepare offload threads
if (uwsgi.static_offload_to_thread) {
pthread_attr_init(&uwsgi.static_offload_thread_attr);
pthread_attr_setdetachstate(&uwsgi.static_offload_thread_attr, PTHREAD_CREATE_DETACHED);
// 512K should be enough...
pthread_attr_setstacksize(&uwsgi.static_offload_thread_attr, 512 * 1024);
pthread_mutex_init(&uwsgi.static_offload_thread_lock, NULL);
}
if (uwsgi.requested_max_fd) {
uwsgi.rl.rlim_cur = uwsgi.requested_max_fd;
uwsgi.rl.rlim_max = uwsgi.requested_max_fd;

View File

@@ -965,56 +965,6 @@ ssize_t uwsgi_http_ssl_recv(struct uwsgi_corerouter *cr, struct corerouter_sessi
}
ssize_t uwsgi_http_nb_send(struct uwsgi_corerouter *cr, struct corerouter_session *cs, char *buf, size_t len) {
struct http_session *hs = (struct http_session *) cs;
ssize_t ret = write(cs->fd, buf, len);
if (ret == (ssize_t) len) {
if (cs->instance_stopped) {
event_queue_add_fd_read(cr->queue, cs->instance_fd);
cs->instance_stopped = 0;
}
if (cs->fd_state) {
event_queue_fd_write_to_read(cr->queue, cs->fd);
cs->fd_state = 0;
}
return len;
}
else if (ret == 0) {
return -1;
}
else if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
if (cs->instance_fd != -1) {
event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read());
cs->instance_stopped = 1;
}
if (!cs->fd_state) {
event_queue_fd_read_to_write(cr->queue, cs->fd);
cs->fd_state = 1;
}
errno = EINPROGRESS;
return -1;
}
uwsgi_error("write()");
return -1;
}
// partial write
hs->buffer_len -= ret;
memcpy(hs->buffer, hs->buffer + ret, hs->buffer_len);
if (cs->instance_fd != -1) {
event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read());
cs->instance_stopped = 1;
}
if (!cs->fd_state) {
event_queue_fd_read_to_write(cr->queue, cs->fd);
cs->fd_state = 1;
}
errno = EINPROGRESS;
return -1;
}
ssize_t uwsgi_http_ssl_send(struct uwsgi_corerouter *cr, struct corerouter_session *cs, char *buf, size_t len) {
struct http_session *hs = (struct http_session *) cs;
int ret = SSL_write(hs->ssl, buf, len);
@@ -1096,6 +1046,58 @@ void uwsgi_ssl_close(struct uwsgi_corerouter *ucr, struct corerouter_session *cs
}
#endif
ssize_t uwsgi_http_nb_send(struct uwsgi_corerouter *cr, struct corerouter_session *cs, char *buf, size_t len) {
struct http_session *hs = (struct http_session *) cs;
ssize_t ret = write(cs->fd, buf, len);
if (ret == (ssize_t) len) {
if (cs->instance_stopped) {
event_queue_add_fd_read(cr->queue, cs->instance_fd);
cs->instance_stopped = 0;
}
if (cs->fd_state) {
event_queue_fd_write_to_read(cr->queue, cs->fd);
cs->fd_state = 0;
}
return len;
}
else if (ret == 0) {
return -1;
}
else if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
if (cs->instance_fd != -1) {
event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read());
cs->instance_stopped = 1;
}
if (!cs->fd_state) {
event_queue_fd_read_to_write(cr->queue, cs->fd);
cs->fd_state = 1;
}
errno = EINPROGRESS;
return -1;
}
uwsgi_error("write()");
return -1;
}
// partial write
hs->buffer_len -= ret;
memcpy(hs->buffer, hs->buffer + ret, hs->buffer_len);
if (cs->instance_fd != -1) {
event_queue_del_fd(cr->queue, cs->instance_fd, event_queue_read());
cs->instance_stopped = 1;
}
if (!cs->fd_state) {
event_queue_fd_read_to_write(cr->queue, cs->fd);
cs->fd_state = 1;
}
errno = EINPROGRESS;
return -1;
}
void http_alloc_session(struct uwsgi_corerouter *ucr, struct uwsgi_gateway_socket *ugs, struct corerouter_session *cs, struct sockaddr *sa, socklen_t s_len) {
struct http_session *hs = (struct http_session *) cs;
hs->ptr = hs->buffer;

View File

@@ -35,6 +35,13 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
wsgi_req->headers = NULL;
}
#ifdef PYTHREE
if (wsgi_req->gc_tracker) {
Py_DECREF((PyObject *)wsgi_req->gc_tracker);
wsgi_req->gc_tracker = NULL;
}
#endif
// this must be done before headers management
if (PyTuple_Size(args) > 2) {
exc_info = PyTuple_GetItem(args, 2);
@@ -72,6 +79,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
return PyErr_Format(PyExc_TypeError, "http status must be a string");
}
#ifdef PYTHREE
// this list maintains reference to encoded strings.. ugly hack, i know, but it works...
wsgi_req->gc_tracker = (void *) PyList_New(0);
#endif
if (uwsgi.shared->options[UWSGI_OPTION_CGI_MODE] == 0) {
base = 4;
@@ -89,7 +100,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
wsgi_req->hvec[1].iov_len = 1;
#ifdef PYTHREE
if (self != Py_None) {
wsgi_req->hvec[2].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(head));
PyObject *zero = PyUnicode_AsASCIIString(head);
wsgi_req->hvec[2].iov_base = PyBytes_AsString(zero);
PyList_Append((PyObject *) wsgi_req->gc_tracker, zero);
Py_DECREF(zero);
}
else {
wsgi_req->hvec[2].iov_base = PyBytes_AsString(head);
@@ -110,7 +124,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
wsgi_req->hvec[0].iov_len = 8;
#ifdef PYTHREE
if (self != Py_None) {
wsgi_req->hvec[1].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(head));
PyObject *zero = PyUnicode_AsASCIIString(head);
wsgi_req->hvec[1].iov_base = PyBytes_AsString(zero);
PyList_Append((PyObject *) wsgi_req->gc_tracker, zero);
Py_DECREF(zero);
}
else {
wsgi_req->hvec[1].iov_base = PyBytes_AsString(head);
@@ -181,7 +198,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
#ifdef PYTHREE
if (self != Py_None) {
wsgi_req->hvec[j].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(h_key));
PyObject *zero = PyUnicode_AsASCIIString(h_key);
wsgi_req->hvec[j].iov_base = PyBytes_AsString(zero);
PyList_Append((PyObject *) wsgi_req->gc_tracker, zero);
Py_DECREF(zero);
}
else {
wsgi_req->hvec[j].iov_base = PyBytes_AsString(h_key);
@@ -195,7 +215,10 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
wsgi_req->hvec[j + 1].iov_len = H_SEP_SIZE;
#ifdef PYTHREE
if (self != Py_None) {
wsgi_req->hvec[j + 2].iov_base = PyBytes_AsString(PyUnicode_AsASCIIString(h_value));
PyObject *zero = PyUnicode_AsASCIIString(h_value);
wsgi_req->hvec[j + 2].iov_base = PyBytes_AsString(zero);
PyList_Append((PyObject *) wsgi_req->gc_tracker, zero);
Py_DECREF(zero);
}
else {
wsgi_req->hvec[j + 2].iov_base = PyBytes_AsString(h_value);
@@ -288,6 +311,13 @@ int uwsgi_python_do_send_headers(struct wsgi_request *wsgi_req) {
wsgi_req->headers = NULL;
}
#ifdef PYTHREE
if (wsgi_req->gc_tracker) {
Py_DECREF((PyObject *)wsgi_req->gc_tracker);
wsgi_req->gc_tracker = NULL;
}
#endif
if (wsgi_req->write_errors > uwsgi.write_errors_tolerance && !uwsgi.disable_write_exception) {
uwsgi_py_write_set_exception(wsgi_req);
return -1;

31
uwsgi.h
View File

@@ -636,6 +636,8 @@ struct uwsgi_socket {
int *retry;
int can_offload;
// this is a special map for having socket->thread mapping
int *fd_threads;
@@ -1007,6 +1009,7 @@ struct wsgi_request {
int status;
void *status_header;
void *headers;
void *gc_tracker;
size_t response_size;
ssize_t headers_size;
@@ -1487,8 +1490,7 @@ struct uwsgi_server {
int check_static_docroot;
int static_offload_to_thread;
pthread_attr_t static_offload_thread_attr;
pthread_mutex_t static_offload_thread_lock;
struct uwsgi_thread *offload_thread;
char *daemonize;
char *daemonize2;
@@ -1851,17 +1853,6 @@ struct uwsgi_server {
};
struct uwsgi_offload_request {
pthread_t tid;
char *buffer;
struct iovec *hvec;
char real_filename[PATH_MAX+1];
size_t real_filename_len;
struct stat st;
struct wsgi_request wsgi_req;
};
struct uwsgi_rpc {
char name[0xff];
void *func;
@@ -2092,8 +2083,6 @@ struct uwsgi_worker {
uint64_t avg_response_time;
uint64_t static_offload_threads;
struct uwsgi_core *cores;
char name[0xff];
@@ -3314,6 +3303,7 @@ void uwsgi_alarms_init();
struct uwsgi_thread {
pthread_t tid;
pthread_attr_t tattr;
int pipe[2];
int queue;
ssize_t rlen;
@@ -3329,6 +3319,17 @@ struct uwsgi_thread {
};
struct uwsgi_thread *uwsgi_thread_new(void (*)(struct uwsgi_thread *));
struct uwsgi_offload_request {
int s;
int fd;
off_t pos;
size_t len;
size_t written;
};
struct uwsgi_thread *uwsgi_offload_thread_start(void);
int uwsgi_offload_request_do(struct wsgi_request *, char *, size_t);
void uwsgi_check_emperor(void);
#ifdef UWSGI_AS_SHARED_LIBRARY
int uwsgi_init(int, char **, char **);