diff --git a/.gitignore b/.gitignore index 27fbb5a..4a8bfc6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ Makefile cscope* objs +tags diff --git a/README b/README index 3b5a577..791ec98 100644 --- a/README +++ b/README @@ -242,6 +242,30 @@ Directives (). Default port is 80. + server_type + syntax: *server_type parameter* + + default: *none* + + context: *upstream* + + description: Specify the server type, currently only "redis" is + supported. + + keepalive + syntax: *keepalive num [single]* + + default: *none* + + context: *upstream* + + description: Enables keep-alive connections for the upstream. Num + specifies the max number of connections to keep open before, if + the max is reached it will close the least recently used connections. + + Single treats everything as a single host. With this flag connections + to different backends are treated as equal. + check syntax: *check interval=milliseconds [fall=count] [rise=count] [timeout=milliseconds] [type=tcp|ssl_hello|smtp|mysql|pop3|imap]* diff --git a/config b/config index b661fc9..f94a564 100644 --- a/config +++ b/config @@ -3,8 +3,8 @@ ngx_feature_name= ngx_feature_run=no ngx_feature_incs= ngx_feature_path="$ngx_addon_dir/modules $ngx_addon_dir/parsers $ngx_addon_dir" -ngx_feature_deps="$ngx_addon_dir/ngx_tcp.h $ngx_addon_dir/ngx_tcp_session.h $ngx_addon_dir/ngx_tcp_upstream.h $ngx_addon_dir/ngx_tcp_upstream_check.h $ngx_addon_dir/ngx_tcp_upstream_round_robin.h" -ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c " +ngx_feature_deps="$ngx_addon_dir/ngx_tcp.h $ngx_addon_dir/ngx_tcp_session.h $ngx_addon_dir/ngx_tcp_upstream.h $ngx_addon_dir/ngx_tcp_upstream_check.h $ngx_addon_dir/ngx_tcp_upstream_round_robin.h $ngx_addon_dir/ngx_tcp_upstream_keepalive.h" +ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c $ngx_addon_dir/ngx_tcp_upstream_keepalive.c $ngx_addon_dir/modules/ngx_tcp_monitor_server_module.c $ngx_addon_dir/modules/ngx_tcp_rpc_server_module.c" ngx_tcp_ssl_deps="$ngx_addon_dir/modules/ngx_tcp_ssl_module.h" ngx_tcp_ssl_src="$ngx_addon_dir/modules/ngx_tcp_ssl_module.c" ngx_tcp_parser_deps="$ngx_addon_dir/parsers/parser.h $ngx_addon_dir/parsers/http_request_parser.h $ngx_addon_dir/parsers/http_response_parser.h $ngx_addon_dir/parsers/smtp_response_parser.h" @@ -17,7 +17,7 @@ if [ $ngx_found = yes ]; then ngx_addon_name=ngx_tcp_module TCP_CORE_MODULES="ngx_tcp_module ngx_tcp_core_module ngx_tcp_upstream_module" - TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module" + TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module ngx_tcp_upstream_keepalive_module ngx_tcp_monitor_module ngx_tcp_rpc_module" NGX_ADDON_DEPS="$NGX_ADDON_DEPS $ngx_feature_deps $ngx_tcp_parser_deps" NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_tcp_src $ngx_tcp_parser_src" diff --git a/modules/ngx_tcp_generic_proxy_module.c b/modules/ngx_tcp_generic_proxy_module.c index d2a81a9..8d8ad27 100644 --- a/modules/ngx_tcp_generic_proxy_module.c +++ b/modules/ngx_tcp_generic_proxy_module.c @@ -32,6 +32,16 @@ static void *ngx_tcp_proxy_create_conf(ngx_conf_t *cf); static char *ngx_tcp_proxy_merge_conf(ngx_conf_t *cf, void *parent, void *child); +static ngx_keyval_t ngx_tcp_server_types[] = { + + { ngx_string("redis"), + ngx_string("quit") + }, + + { ngx_null_string, + ngx_null_string + } +}; static ngx_tcp_protocol_t ngx_tcp_generic_protocol = { @@ -301,18 +311,19 @@ ngx_tcp_upstream_init_proxy_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) static void ngx_tcp_proxy_handler(ngx_event_t *ev) { - char *action, *recv_action, *send_action; - off_t *read_bytes, *write_bytes; - size_t size; - ssize_t n; - ngx_buf_t *b; - ngx_err_t err; - ngx_uint_t do_write, first_read; - ngx_connection_t *c, *src, *dst; - ngx_tcp_session_t *s; - ngx_tcp_proxy_conf_t *pcf; - ngx_tcp_proxy_ctx_t *pctx; - ngx_tcp_core_srv_conf_t *cscf; + char *action, *recv_action, *send_action; + off_t *read_bytes, *write_bytes; + size_t size; + ssize_t n; + ngx_buf_t *b; + ngx_err_t err; + ngx_uint_t do_write, first_read, i; + ngx_connection_t *c, *src, *dst; + ngx_tcp_session_t *s; + ngx_tcp_proxy_conf_t *pcf; + ngx_tcp_proxy_ctx_t *pctx; + ngx_tcp_core_srv_conf_t *cscf; + ngx_tcp_upstream_srv_conf_t *uscf; c = ev->data; s = c->data; @@ -380,6 +391,8 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) /* SSL Need this */ if (s->connection->ssl) { first_read = 1; + } else { + first_read = 0; } #else first_read = 0; @@ -389,6 +402,9 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) "tcp proxy handler: %d, #%d > #%d, time:%ui", do_write, src->fd, dst->fd, ngx_current_msec); + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_proxy_module); + uscf = pcf->upstream.upstream; + for ( ;; ) { if (do_write) { @@ -398,6 +414,30 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) if (size && dst->write->ready) { c->log->action = send_action; + if (uscf->server_type.data) { + for (i = 0; ngx_tcp_server_types[i].key.data; i++) { + if (ngx_strncmp(uscf->server_type.data, + ngx_tcp_server_types[i].key.data, + ngx_min(uscf->server_type.len, + ngx_tcp_server_types[i].key.len)) == 0) { + break; + } + + } + if (ngx_tcp_server_types[i].value.data + && ngx_strncasecmp(b->pos, + ngx_tcp_server_types[i].value.data, + ngx_tcp_server_types[i].value.len) == 0) { + ngx_log_debug0(NGX_LOG_DEBUG_TCP, ev->log, 0, + "received quit, close session"); + + ngx_tcp_finalize_session(s); + return; + } + } else { + s->upstream->keepalive = 0; + } + n = dst->send(dst, b->pos, size); err = ngx_socket_errno; @@ -501,8 +541,6 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) return; } - pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_proxy_module); - if (c == s->connection) { ngx_add_timer(c->read, cscf->timeout); } diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c new file mode 100644 index 0000000..3cd5b3f --- /dev/null +++ b/modules/ngx_tcp_monitor_server_module.c @@ -0,0 +1,837 @@ +/* + * Copyright (C) 2013 Shang Yuanchun + * + */ + +#include +#include +#include + +/* + * header: |---- 4 ----|-- 2 --|-- 2 --| + * length type padding + * + * all are little endian + * + */ +typedef struct ngx_tcp_monitor_header_s { + uint32_t length; + uint16_t type; + uint16_t spare0; +} __attribute__ ((packed)) ngx_tcp_monitor_header_t; + +#define HEADER_LENGTH sizeof(ngx_tcp_monitor_header_t) + +#define monitor_packet_size(ptr) (*(u_char *)(ptr) + \ + (*((u_char *)(ptr) + 1) << 8) + \ + (*((u_char *)(ptr) + 2) << 16) + \ + (*((u_char *)(ptr) + 3) << 24) ) + +#define MONITOR_TYPE_OFFSET offsetof(ngx_tcp_monitor_header_t, type) +#define monitor_packet_type(ptr) (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET) + \ + (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET + 1) << 8) ) + +#define set_monitor_packet_size(ptr, size) do { \ + *(u_char *)(ptr) = (size) & 0xff; \ + *((u_char *)(ptr) + 1) = ((size) >> 8) & 0xff; \ + *((u_char *)(ptr) + 2) = ((size) >> 16) & 0xff; \ + *((u_char *)(ptr) + 3) = ((size) >> 24) & 0xff; \ + } while(0) + +#define set_monitor_return_code(ptr, code) do { \ + *((u_char *)(ptr) + MONITOR_TYPE_OFFSET) = (code) & 0xff; \ + *((u_char *)(ptr) + MONITOR_TYPE_OFFSET + 1) = ((code) >> 8) & 0xff; \ + } while(0) + +#define PACKET_TYPE_JSON 1 +#define PACKET_TYPE_TLV 2 +#define PACKET_TYPE_BSON 3 +#define PACKET_TYPE_MSGPACK 4 + +typedef struct ngx_tcp_monitor_ctx_s { + ngx_peer_connection_t *upstream; + + // ngx_tcp_session_t's buffer is header_in + // request_body is the request body + ngx_buf_t *request_body; + ngx_uint_t request_len; + + ngx_buf_t *header_out; + + ngx_buf_t *upstream_request_header; + ngx_buf_t *upstream_request_tail; + + ngx_buf_t *upstream_response; +} ngx_tcp_monitor_ctx_t; + + +typedef struct ngx_tcp_monitor_conf_s { + ngx_tcp_upstream_conf_t upstream; + ngx_str_t url; + ngx_str_t queue_name; +} ngx_tcp_monitor_conf_t; + +static inline size_t ngx_get_num_size(ngx_uint_t i) +{ + size_t n = 0; + + do { + i /= 10; + n++; + } while (i > 0); + + return n; +} + +static void ngx_tcp_monitor_init_session(ngx_tcp_session_t *s); +static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, + ngx_tcp_session_t *s); +static void ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, + ngx_tcp_upstream_t *u); +static char *ngx_tcp_monitor_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); +static void ngx_tcp_monitor_client_read_handler(ngx_event_t *rev); +static void ngx_tcp_monitor_client_write_handler(ngx_event_t *wev); +static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev); +static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev); +static void *ngx_tcp_monitor_create_conf(ngx_conf_t *cf); +static char *ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, + void *child); +static ngx_int_t ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, + ngx_buf_t **header, ngx_buf_t **tail); +static ngx_int_t ngx_tcp_monitor_build_response(ngx_tcp_session_t *s); + +static ngx_tcp_protocol_t ngx_tcp_monitor_protocol = { + + ngx_string("monitor_server"), + { 0, 0, 0, 0 }, + NGX_TCP_GENERIC_PROTOCOL, + ngx_tcp_monitor_init_session, + NULL, + NULL, + ngx_string("500 Internal server error" CRLF) + +}; + + +static ngx_command_t ngx_tcp_monitor_commands[] = { + + { ngx_string("monitor_pass"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_tcp_monitor_pass, + NGX_TCP_SRV_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("queue_name"), + NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, queue_name), + NULL }, + + { ngx_string("monitor_connect_timeout"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, upstream.connect_timeout), + NULL }, + + { ngx_string("monitor_read_timeout"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, upstream.read_timeout), + NULL }, + + { ngx_string("monitor_send_timeout"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, upstream.send_timeout), + NULL }, + + ngx_null_command +}; + + +static ngx_tcp_module_t ngx_tcp_monitor_module_ctx = { + &ngx_tcp_monitor_protocol, /* protocol */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_tcp_monitor_create_conf, /* create server configuration */ + ngx_tcp_monitor_merge_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_tcp_monitor_module = { + NGX_MODULE_V1, + &ngx_tcp_monitor_module_ctx, /* module context */ + ngx_tcp_monitor_commands, /* module directives */ + NGX_TCP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void +ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) +{ + ngx_connection_t *c; + ngx_tcp_core_srv_conf_t *cscf; + ngx_tcp_monitor_ctx_t *ctx; + + c = s->connection; + + ngx_log_debug0(NGX_LOG_DEBUG_TCP, c->log, 0, "tcp monitor init session"); + + cscf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_core_module); + + s->buffer = ngx_create_temp_buf(s->connection->pool, HEADER_LENGTH); + if (s->buffer == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + s->out.len = 0; + + c->write->handler = ngx_tcp_monitor_client_write_handler; + c->read->handler = ngx_tcp_monitor_client_read_handler; + + ngx_add_timer(c->read, cscf->timeout); + + ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_tcp_monitor_ctx_t)); + if (ctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + ngx_tcp_set_ctx(s, ctx, ngx_tcp_monitor_module); + + // We will call this after we receive data completely + // ngx_tcp_monitor_init_upstream(c, s); + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + return; +} + + +/* + * FIXME: 1. I am not sure below will block! + * 2. Server did not close connection currently! + * + */ +static void +ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) +{ + ssize_t n, size; + ngx_int_t rc; + ngx_err_t err; + ngx_buf_t *b; + ngx_connection_t *c; + ngx_tcp_session_t *s; + ngx_tcp_monitor_ctx_t *pctx; + + c = rev->data; + s = c->data; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor client read handler: %d", c->fd); + + if (rev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + if (pctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + for ( ;; ) { + if (c->read->ready) { + + c->log->action = "client read: reading from client"; + if (s->bytes_read < (off_t)HEADER_LENGTH) { + size = HEADER_LENGTH - s->bytes_read; + b = s->buffer; + } else { + if (pctx->request_body == NULL) { + pctx->request_len = monitor_packet_size(s->buffer->start); + pctx->request_body = ngx_create_temp_buf(c->pool, + pctx->request_len); + } + size = pctx->request_len - s->bytes_read + HEADER_LENGTH; + b = pctx->request_body; + } + if (size < 0) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, + "client data not correct, handler: %d", c->fd); + ngx_tcp_finalize_session(s); + return; + } + n = c->recv(c, b->last, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "client read error"); + ngx_tcp_finalize_session(s); + return; + } + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor handler recv:%d", n); + + if (n == NGX_AGAIN || n == 0) { + break; + } + + if (n > 0) { + b->last += n; + s->bytes_read += n; + continue; + } + + if (n == NGX_ERROR) { + c->read->eof = 1; + } + } + + break; + } + + if (s->bytes_read == (off_t)(pctx->request_len + HEADER_LENGTH)) + { + ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "read client data done"); + rc = ngx_tcp_monitor_build_query(s, &pctx->upstream_request_header, + &pctx->upstream_request_tail); + if (rc != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + ngx_tcp_monitor_init_upstream(c, s); + return; + } + + if (ngx_handle_read_event(rev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } +} + + +static void +ngx_tcp_monitor_client_write_handler(ngx_event_t *wev) +{ + ssize_t n, size; + ngx_connection_t *c; + ngx_tcp_session_t *s; + ngx_tcp_monitor_ctx_t *pctx; + ngx_err_t err; + + c = wev->data; + s = c->data; + + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor client write handler"); + + if (wev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor client send timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, + "tcp monitor client write handler: %d", c->fd); + + for ( ;; ) { + if (c->write->ready) { + c->log->action = "client send: sending to client"; + size = pctx->header_out->end - pctx->header_out->pos; + if (size <= 0) { + break; + } + n = c->send(c, pctx->header_out->pos, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "monitor client send error"); + return; + } + if (n > 0) { + pctx->header_out->pos +=n; + continue; + } + } + break; + } + + if (pctx->header_out->pos == pctx->header_out->end) { + ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "client send data done"); + ngx_tcp_finalize_session(s); + return; + } + + if (ngx_handle_write_event(wev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + } +} + + +static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev) +{ + ssize_t n, size; + ngx_int_t rc; + ngx_err_t err; + ngx_connection_t *c; + ngx_tcp_session_t *s; + ngx_tcp_monitor_ctx_t *pctx; + + c = rev->data; + s = c->data; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor upstream read handler: %d", c->fd); + + if (rev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + if (pctx->upstream_response == NULL) { + pctx->upstream_response = ngx_create_temp_buf(c->pool, ngx_pagesize); + if (pctx->upstream_response == NULL) { + ngx_tcp_finalize_session(s); + return; + } + s->bytes_read = 0; + } + + for ( ;; ) { + if (c->read->ready) { + c->log->action = "upstream read: reading from upstream"; + size = (pctx->upstream_response->end + - pctx->upstream_response->start) - s->bytes_read; + n = c->recv(c, pctx->upstream_response->last, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "upstream read error"); + ngx_tcp_finalize_session(s); + return; + } + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor handler recv:%d", n); + + if (n == NGX_AGAIN || n == 0) { + break; + } + + if (n > 0) { + pctx->upstream_response->last += n; + s->bytes_read += n; + continue; + } + + if (n == NGX_ERROR) { + c->read->eof = 1; + } + } + break; + } + + if (c->read->eof || + (*(pctx->upstream_response->last - 1) == LF)) { + rc = ngx_tcp_monitor_build_response(s); + if (rc != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + s->connection->write->handler(s->connection->write); + return; + } + + if (ngx_handle_read_event(rev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } +} + + +static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) +{ + ssize_t n, size; + ngx_connection_t *c; + ngx_tcp_session_t *s; + ngx_uint_t header_length; + ngx_uint_t tail_length; + ngx_tcp_monitor_ctx_t *pctx; + ngx_buf_t *b; + ngx_err_t err; + ngx_tcp_monitor_conf_t *pcf; + + c = wev->data; + s = c->data; + + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor upstream write handler"); + + if (wev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor upstream send timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + if (pctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, + "tcp monitor upstream write handler: %d", c->fd); + + header_length = pctx->upstream_request_header->end - + pctx->upstream_request_header->start; + tail_length = pctx->upstream_request_tail->end - + pctx->upstream_request_tail->start; + if (s->bytes_write == (off_t)(header_length + pctx->request_len + tail_length)) { + return; + } + + for ( ;; ) { + if (c->write->ready) { + c->log->action = "upstream send: sending to upstream server"; + size = header_length - s->bytes_write; + b = pctx->upstream_request_header; + if (size <= 0) { + size += pctx->request_len; + b = pctx->request_body; + } + if (size <= 0) { + size += tail_length; + b = pctx->upstream_request_tail; + } + if (size <= 0) { + break; + } + n = c->send(c, b->pos, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "monitor upstream send error"); + return; + } + if (n > 0) { + b->pos +=n; + s->bytes_write += n; + continue; + } + } + break; + } + + if (s->bytes_write == (off_t)(header_length + pctx->request_len + tail_length)) { + ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "upstream send data done"); + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + ngx_add_timer(c->read, pcf->upstream.read_timeout); + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + return; + } + + if (ngx_handle_write_event(wev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } +} + + +static ngx_int_t +ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, ngx_buf_t **header, ngx_buf_t **tail) +{ + size_t len; + u_short packet_type; + ngx_tcp_monitor_conf_t *pcf; + ngx_tcp_monitor_ctx_t *pctx; + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + packet_type = monitor_packet_type(s->buffer->start); + // FIXME: below is specific to redis protocol + // http://redis.io/topics/protocol + switch(packet_type) { + case PACKET_TYPE_JSON: + len = sizeof("*3" CRLF "$5" CRLF "LPUSH" CRLF "$") -1 + + ngx_get_num_size(pcf->queue_name.len) + + sizeof(CRLF) -1 + pcf->queue_name.len + + sizeof(CRLF "$") -1 + + ngx_get_num_size(pctx->request_len) + + sizeof(CRLF) - 1; + *header = ngx_create_temp_buf(s->connection->pool, len); + if (*header == NULL) { + return NGX_ERROR; + } + ngx_sprintf((*header)->last, "*3"CRLF"$5"CRLF"LPUSH"CRLF + "$%d"CRLF"%*s"CRLF"$%d"CRLF, + pcf->queue_name.len, + pcf->queue_name.len, + pcf->queue_name.data, + pctx->request_len); + len = sizeof(CRLF) -1; + *tail = ngx_create_temp_buf(s->connection->pool, len); + if (*tail == NULL) { + return NGX_ERROR; + } + ngx_memcpy((*tail)->last, CRLF, sizeof(CRLF) - 1); + return NGX_OK; + + default: + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "invalid monitor packet type: %hu", packet_type); + return NGX_ERROR; + } +} + + +static ngx_int_t ngx_tcp_monitor_build_response(ngx_tcp_session_t *s) +{ + u_char chr; + ngx_tcp_monitor_ctx_t *pctx; + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + set_monitor_packet_size(pctx->header_out->start, 0); + if (*(pctx->upstream_response->last - 1) != LF || + *(pctx->upstream_response->last - 2) != CR) { + return NGX_ERROR; + } + chr = *pctx->upstream_response->pos; + switch (chr) { + case '+': + case ':': + set_monitor_return_code(pctx->header_out->start, 0); + break; + case '-': + default: + set_monitor_return_code(pctx->header_out->start, 1); + break; + } + return NGX_OK; +} + + +static void +ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) +{ + ngx_tcp_upstream_t *u; + ngx_tcp_monitor_ctx_t *p; + ngx_tcp_monitor_conf_t *pcf; + + s->connection->log->action = "ngx_tcp_monitor_init_upstream"; + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + if (pcf->upstream.upstream == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + if (ngx_tcp_upstream_create(s) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + u = s->upstream; + + u->conf = &pcf->upstream; + + u->write_event_handler = ngx_tcp_upstream_init_monitor_handler; + u->read_event_handler = ngx_tcp_upstream_init_monitor_handler; + + p = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + p->upstream = &u->peer; + + p->header_out = ngx_create_temp_buf(s->connection->pool, HEADER_LENGTH); + if (p->header_out == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + ngx_tcp_upstream_init(s); + + return; +} + + +static void +ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) +{ + ngx_connection_t *c; + ngx_tcp_monitor_ctx_t *pctx; + ngx_tcp_monitor_conf_t *pcf; + + c = s->connection; + c->log->action = "ngx_tcp_upstream_init_monitor_handler"; + + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor upstream init monitor"); + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + + if (pcf == NULL || pctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + pctx->upstream = &s->upstream->peer; + + c = pctx->upstream->connection; + if (c->read->timedout || c->write->timedout) { + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_TIMEOUT); + return; + } + + if (ngx_tcp_upstream_check_broken_connection(s) != NGX_OK){ + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_ERROR); + return; + } + + c->read->handler = ngx_tcp_monitor_upstream_read_handler; + c->write->handler = ngx_tcp_monitor_upstream_write_handler; + + ngx_add_timer(c->write, pcf->upstream.send_timeout); + + if (ngx_handle_write_event(c->write, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + c->write->handler(c->write); + + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor upstream init monitor done"); + + return; +} + + +static char * +ngx_tcp_monitor_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_tcp_monitor_conf_t *pcf = conf; + + u_short port = 80; + ngx_str_t *value, *url = &pcf->url; + ngx_url_t u; + ngx_tcp_core_srv_conf_t *cscf; + + cscf = ngx_tcp_conf_get_module_srv_conf(cf, ngx_tcp_core_module); + + if (cscf->protocol && ngx_strncmp(cscf->protocol->name.data, + (u_char *)"tcp_generic", + sizeof("tcp_generic") - 1) != 0) { + + return "the protocol should be tcp_generic"; + } + + if (cscf->protocol == NULL) { + cscf->protocol = &ngx_tcp_monitor_protocol; + } + + if (pcf->upstream.upstream) { + return "is duplicate"; + } + + value = cf->args->elts; + + url = &value[1]; + + ngx_memzero(&u, sizeof(u)); + + u.url.len = url->len; + u.url.data = url->data; + u.default_port = port; + u.uri_part = 1; + u.no_resolve = 1; + + pcf->upstream.upstream = ngx_tcp_upstream_add(cf, &u, 0); + if (pcf->upstream.upstream == NULL) { + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + + +static void * +ngx_tcp_monitor_create_conf(ngx_conf_t *cf) +{ + ngx_tcp_monitor_conf_t *pcf; + + pcf = ngx_pcalloc(cf->pool, sizeof(ngx_tcp_monitor_conf_t)); + if (pcf == NULL) { + return NULL; + } + + pcf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC; + pcf->upstream.send_timeout = NGX_CONF_UNSET_MSEC; + pcf->upstream.read_timeout = NGX_CONF_UNSET_MSEC; + + return pcf; +} + + +static char * +ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_tcp_monitor_conf_t *prev = parent; + ngx_tcp_monitor_conf_t *conf = child; + + ngx_conf_merge_msec_value(conf->upstream.connect_timeout, + prev->upstream.connect_timeout, 60000); + + ngx_conf_merge_msec_value(conf->upstream.send_timeout, + prev->upstream.send_timeout, 60000); + + ngx_conf_merge_msec_value(conf->upstream.read_timeout, + prev->upstream.read_timeout, 60000); + + return NGX_CONF_OK; +} diff --git a/modules/ngx_tcp_rpc_server_module.c b/modules/ngx_tcp_rpc_server_module.c new file mode 100644 index 0000000..907304a --- /dev/null +++ b/modules/ngx_tcp_rpc_server_module.c @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2013 Shang Yuanchun + * + */ + +#include +#include +#include + +/* + * header: |---- 4 ----|-- 2 --|-- 2 --| + * length type padding + * + * all are little endian + * + */ +typedef struct ngx_tcp_rpc_header_s { + uint32_t length; + uint32_t magic; + uint16_t type; + uint16_t version; + uint32_t spare0; +} __attribute__ ((packed)) ngx_tcp_rpc_header_t; + +#define HEADER_LENGTH sizeof(ngx_tcp_rpc_header_t) + +#define rpc_packet_size(ptr) (*(u_char *)(ptr) + \ + (*((u_char *)(ptr) + 1) << 8) + \ + (*((u_char *)(ptr) + 2) << 16) + \ + (*((u_char *)(ptr) + 3) << 24) ) + +#define RPC_TYPE_OFFSET offsetof(ngx_tcp_rpc_header_t, type) +#define rpc_packet_type(ptr) (*((u_char *)(ptr) + RPC_TYPE_OFFSET) + \ + (*((u_char *)(ptr) + RPC_TYPE_OFFSET + 1) << 8) ) + +#define PACKET_TYPE_JSON 1 +#define PACKET_TYPE_TLV 2 +#define PACKET_TYPE_BSON 3 +#define PACKET_TYPE_MSGPACK 4 + +typedef struct ngx_tcp_rpc_ctx_s { + ngx_peer_connection_t *upstream; + + // ngx_tcp_session_t's buffer is header_in + // request_body is the request body + ngx_buf_t *request_body; + ngx_uint_t request_len; + + ngx_buf_t *header_out; +} ngx_tcp_rpc_ctx_t; + + +typedef struct ngx_tcp_rpc_conf_s { + ngx_tcp_upstream_conf_t upstream; + ngx_int_t rpc_server; + ngx_str_t document_root; +} ngx_tcp_rpc_conf_t; + +static void ngx_tcp_rpc_init_session(ngx_tcp_session_t *s); +static void ngx_tcp_rpc_client_read_handler(ngx_event_t *rev); +static void ngx_tcp_rpc_client_write_handler(ngx_event_t *wev); +static void *ngx_tcp_rpc_create_conf(ngx_conf_t *cf); +static char *ngx_tcp_rpc_merge_conf(ngx_conf_t *cf, void *parent, + void *child); + +static ngx_tcp_protocol_t ngx_tcp_rpc_protocol = { + + ngx_string("rpc_server"), + { 0, 0, 0, 0 }, + NGX_TCP_GENERIC_PROTOCOL, + ngx_tcp_rpc_init_session, + NULL, + NULL, + ngx_string("500 Internal server error" CRLF) + +}; + + +static ngx_command_t ngx_tcp_rpc_commands[] = { + + { ngx_string("rpc_server"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_rpc_conf_t, rpc_server), + NULL }, + + { ngx_string("root"), + NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_rpc_conf_t, document_root), + NULL }, + + ngx_null_command +}; + + +static ngx_tcp_module_t ngx_tcp_rpc_module_ctx = { + &ngx_tcp_rpc_protocol, /* protocol */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_tcp_rpc_create_conf, /* create server configuration */ + ngx_tcp_rpc_merge_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_tcp_rpc_module = { + NGX_MODULE_V1, + &ngx_tcp_rpc_module_ctx, /* module context */ + ngx_tcp_rpc_commands, /* module directives */ + NGX_TCP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void +ngx_tcp_rpc_init_session(ngx_tcp_session_t *s) +{ + return; +} + + +static void +ngx_tcp_rpc_client_read_handler(ngx_event_t *rev) +{ + return; +} + + +static void +ngx_tcp_rpc_client_write_handler(ngx_event_t *wev) +{ + return; +} + + +static void * +ngx_tcp_rpc_create_conf(ngx_conf_t *cf) +{ + ngx_tcp_rpc_conf_t *pcf; + + pcf = ngx_pcalloc(cf->pool, sizeof(ngx_tcp_rpc_conf_t)); + if (pcf == NULL) { + return NULL; + } + + return pcf; +} + + +#define unused(arg) (void)(arg) + +static char * +ngx_tcp_rpc_merge_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_tcp_rpc_conf_t *prev = parent; + ngx_tcp_rpc_conf_t *conf = child; + + unused(prev); + unused(conf); + + unused(ngx_tcp_rpc_client_read_handler); + unused(ngx_tcp_rpc_client_write_handler); + + return NGX_CONF_OK; +} diff --git a/ngx_tcp_upstream.c b/ngx_tcp_upstream.c index 2d1b419..94d93da 100644 --- a/ngx_tcp_upstream.c +++ b/ngx_tcp_upstream.c @@ -98,6 +98,13 @@ static ngx_command_t ngx_tcp_upstream_commands[] = { offsetof(ngx_tcp_upstream_main_conf_t, check_shm_size), NULL }, + { ngx_string("server_type"), + NGX_TCP_UPS_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_upstream_srv_conf_t, server_type), + NULL }, + ngx_null_command }; @@ -183,8 +190,10 @@ ngx_tcp_upstream_init(ngx_tcp_session_t *s) cln = ngx_tcp_cleanup_add(s, 0); cln->handler = ngx_tcp_upstream_cleanup; - cln->data = s; - u->cleanup = &cln->handler; + cln->data = s; + u->cleanup = &cln->handler; + + u->keepalive = 0; if (u->resolved == NULL) { @@ -339,7 +348,7 @@ ngx_tcp_upstream_connect(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) ngx_log_debug1(NGX_LOG_DEBUG_TCP, s->connection->log, 0, "tcp upstream connect: %d", rc); - if (rc != NGX_OK && rc != NGX_AGAIN) { + if (rc == NGX_ERROR) { ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, "upstream servers are busy or encounter error!"); @@ -350,7 +359,19 @@ ngx_tcp_upstream_connect(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) return; } - /* rc == NGX_OK or rc == NGX_AGAIN */ + if (rc == NGX_BUSY) { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "no live upstreams"); + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_NOLIVE); + return; + } + + if (rc == NGX_DECLINED) { + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_ERROR); + return; + } + + /* rc == NGX_OK or rc == NGX_AGAIN or rc == NGX_DONE */ if (u->peer.check_index != NGX_INVALID_CHECK_INDEX) { ngx_tcp_check_get_peer(u->peer.check_index); @@ -541,7 +562,7 @@ ngx_tcp_upstream_finalize_session(ngx_tcp_session_t *s, if (u->cleanup) { *u->cleanup = NULL; - u->cleanup = NULL; + u->cleanup = NULL; } if (u->state && u->state->response_sec) { @@ -559,7 +580,7 @@ ngx_tcp_upstream_finalize_session(ngx_tcp_session_t *s, u->peer.check_index = NGX_INVALID_CHECK_INDEX; } - if (u->peer.connection) { + if (u->peer.connection && !u->keepalive) { ngx_log_debug1(NGX_LOG_DEBUG_TCP, s->connection->log, 0, "close tcp upstream connection: %d", @@ -676,6 +697,7 @@ ngx_tcp_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags) uscf->no_port = u->no_port; #endif uscf->code.status_alive = 0; + uscf->server_type = (ngx_str_t) ngx_null_string; if (u->naddrs == 1) { uscf->servers = ngx_array_create(cf->pool, 1, diff --git a/ngx_tcp_upstream.h b/ngx_tcp_upstream.h index 4fdce67..7188b4d 100644 --- a/ngx_tcp_upstream.h +++ b/ngx_tcp_upstream.h @@ -120,6 +120,8 @@ struct ngx_tcp_upstream_srv_conf_s { ngx_uint_t return_code; ngx_uint_t status_alive; } code; + + ngx_str_t server_type; }; @@ -160,6 +162,8 @@ struct ngx_tcp_upstream_s { ngx_tcp_upstream_resolved_t *resolved; ngx_tcp_upstream_state_t *state; ngx_tcp_cleanup_pt *cleanup; + + unsigned keepalive:1; }; diff --git a/ngx_tcp_upstream_keepalive.c b/ngx_tcp_upstream_keepalive.c new file mode 100644 index 0000000..21a61e7 --- /dev/null +++ b/ngx_tcp_upstream_keepalive.c @@ -0,0 +1,483 @@ + +#include +#include +#include +#include + +static ngx_int_t ngx_tcp_upstream_init_keepalive_peer(ngx_tcp_session_t *r, + ngx_tcp_upstream_srv_conf_t *us); +static ngx_int_t ngx_tcp_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, + void *data); +static void ngx_tcp_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, + void *data, ngx_uint_t state); + +static void ngx_tcp_upstream_keepalive_dummy_handler(ngx_event_t *ev); +static void ngx_tcp_upstream_keepalive_close_handler(ngx_event_t *ev); +static void ngx_tcp_upstream_keepalive_close(ngx_connection_t *c); + + +#if (NGX_HTTP_SSL) +static ngx_int_t ngx_tcp_upstream_keepalive_set_session( + ngx_peer_connection_t *pc, void *data); +static void ngx_tcp_upstream_keepalive_save_session(ngx_peer_connection_t *pc, + void *data); +#endif + +static void *ngx_tcp_upstream_keepalive_create_conf(ngx_conf_t *cf); +static char *ngx_tcp_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_command_t ngx_tcp_upstream_keepalive_commands[] = { + + { ngx_string("keepalive"), + NGX_TCP_UPS_CONF|NGX_CONF_TAKE12, + ngx_tcp_upstream_keepalive, + NGX_TCP_SRV_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_tcp_module_t ngx_tcp_upstream_keepalive_module_ctx = { + NULL, + NULL, + NULL, + ngx_tcp_upstream_keepalive_create_conf, + NULL +}; + + +ngx_module_t ngx_tcp_upstream_keepalive_module = { + NGX_MODULE_V1, + &ngx_tcp_upstream_keepalive_module_ctx, /* module context */ + ngx_tcp_upstream_keepalive_commands, /* module directives */ + NGX_TCP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_int_t +ngx_tcp_upstream_init_keepalive(ngx_conf_t *cf, + ngx_tcp_upstream_srv_conf_t *us) +{ + ngx_uint_t i; + ngx_tcp_upstream_keepalive_srv_conf_t *kcf; + ngx_tcp_upstream_keepalive_cache_t *cached; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0, + "init keepalive"); + + kcf = ngx_tcp_conf_upstream_srv_conf(us, + ngx_tcp_upstream_keepalive_module); + + if (kcf->original_init_upstream(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + kcf->original_init_peer = us->peer.init; + + us->peer.init = ngx_tcp_upstream_init_keepalive_peer; + + /* allocate cache items and add to free queue */ + + cached = ngx_pcalloc(cf->pool, + sizeof(ngx_tcp_upstream_keepalive_cache_t) * kcf->max_cached); + if (cached == NULL) { + return NGX_ERROR; + } + + ngx_queue_init(&kcf->cache); + ngx_queue_init(&kcf->free); + + for (i = 0; i < kcf->max_cached; i++) { + ngx_queue_insert_head(&kcf->free, &cached[i].queue); + cached[i].conf = kcf; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_tcp_upstream_init_keepalive_peer(ngx_tcp_session_t *r, + ngx_tcp_upstream_srv_conf_t *us) +{ + ngx_tcp_upstream_keepalive_peer_data_t *kp; + ngx_tcp_upstream_keepalive_srv_conf_t *kcf; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "init keepalive peer"); + + kcf = ngx_tcp_conf_upstream_srv_conf(us, + ngx_tcp_upstream_keepalive_module); + + kp = ngx_palloc(r->pool, sizeof(ngx_tcp_upstream_keepalive_peer_data_t)); + if (kp == NULL) { + return NGX_ERROR; + } + + if (kcf->original_init_peer(r, us) != NGX_OK) { + return NGX_ERROR; + } + + kp->conf = kcf; + kp->upstream = r->upstream; + kp->data = r->upstream->peer.data; + kp->original_get_peer = r->upstream->peer.get; + kp->original_free_peer = r->upstream->peer.free; + + r->upstream->peer.data = kp; + r->upstream->peer.get = ngx_tcp_upstream_get_keepalive_peer; + r->upstream->peer.free = ngx_tcp_upstream_free_keepalive_peer; + + r->upstream->keepalive = 1; + +#if (NGX_HTTP_SSL) + kp->original_set_session = r->upstream->peer.set_session; + kp->original_save_session = r->upstream->peer.save_session; + r->upstream->peer.set_session = ngx_tcp_upstream_keepalive_set_session; + r->upstream->peer.save_session = ngx_tcp_upstream_keepalive_save_session; +#endif + + return NGX_OK; +} + + +static ngx_int_t +ngx_tcp_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; + ngx_tcp_upstream_keepalive_cache_t *item; + + ngx_int_t rc; + ngx_queue_t *q, *cache; + ngx_connection_t *c; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer"); + + /* ask balancer */ + + rc = kp->original_get_peer(pc, kp->data); + + if (rc != NGX_OK) { + return rc; + } + + /* search cache for suitable connection */ + + cache = &kp->conf->cache; + + for (q = ngx_queue_head(cache); + q != ngx_queue_sentinel(cache); + q = ngx_queue_next(q)) + { + item = ngx_queue_data(q, ngx_tcp_upstream_keepalive_cache_t, queue); + c = item->connection; + + if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr, + item->socklen, pc->socklen) + == 0) + { + ngx_queue_remove(q); + ngx_queue_insert_head(&kp->conf->free, q); + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer: using connection %p", c); + + c->idle = 0; + c->log = pc->log; + c->read->log = pc->log; + c->write->log = pc->log; + c->pool->log = pc->log; + + pc->connection = c; + pc->cached = 1; + + return NGX_DONE; + } + } + + return NGX_OK; +} + + +static void +ngx_tcp_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; + ngx_tcp_upstream_keepalive_cache_t *item; + + ngx_queue_t *q; + ngx_connection_t *c; + ngx_tcp_upstream_t *u; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer"); + + /* cache valid connections */ + + u = kp->upstream; + c = pc->connection; + + if (state & NGX_PEER_FAILED + || c == NULL + || c->read->error + || c->read->timedout + || c->write->error + || c->write->timedout) + { + goto invalid; + } + + if (!u->keepalive) { + goto invalid; + } + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto invalid; + } + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer: saving connection %p", c); + + if (ngx_queue_empty(&kp->conf->free)) { + + q = ngx_queue_last(&kp->conf->cache); + ngx_queue_remove(q); + + item = ngx_queue_data(q, ngx_tcp_upstream_keepalive_cache_t, queue); + + ngx_tcp_upstream_keepalive_close(item->connection); + + } else { + q = ngx_queue_head(&kp->conf->free); + ngx_queue_remove(q); + + item = ngx_queue_data(q, ngx_tcp_upstream_keepalive_cache_t, queue); + } + + item->connection = c; + ngx_queue_insert_head(&kp->conf->cache, q); + + pc->connection = NULL; + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + c->write->handler = ngx_tcp_upstream_keepalive_dummy_handler; + c->read->handler = ngx_tcp_upstream_keepalive_close_handler; + + c->data = item; + c->idle = 1; + c->log = ngx_cycle->log; + c->read->log = ngx_cycle->log; + c->write->log = ngx_cycle->log; + c->pool->log = ngx_cycle->log; + + item->socklen = pc->socklen; + ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); + + if (c->read->ready) { + ngx_tcp_upstream_keepalive_close_handler(c->read); + } + +invalid: + + kp->original_free_peer(pc, kp->data, state); +} + + +static void +ngx_tcp_upstream_keepalive_dummy_handler(ngx_event_t *ev) +{ + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive dummy handler"); +} + + +static void +ngx_tcp_upstream_keepalive_close_handler(ngx_event_t *ev) +{ + ngx_tcp_upstream_keepalive_srv_conf_t *conf; + ngx_tcp_upstream_keepalive_cache_t *item; + + int n; + char buf[1]; + ngx_connection_t *c; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive close handler"); + + c = ev->data; + + if (c->close) { + goto close; + } + + n = recv(c->fd, buf, 1, MSG_PEEK); + + if (n == -1 && ngx_socket_errno == NGX_EAGAIN) { + /* stale event */ + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto close; + } + + return; + } + +close: + + item = c->data; + conf = item->conf; + + ngx_tcp_upstream_keepalive_close(c); + + ngx_queue_remove(&item->queue); + ngx_queue_insert_head(&conf->free, &item->queue); +} + + +static void +ngx_tcp_upstream_keepalive_close(ngx_connection_t *c) +{ + +#if (NGX_HTTP_SSL) + + if (c->ssl) { + c->ssl->no_wait_shutdown = 1; + c->ssl->no_send_shutdown = 1; + + if (ngx_ssl_shutdown(c) == NGX_AGAIN) { + c->ssl->handler = ngx_tcp_upstream_keepalive_close; + return; + } + } + +#endif + + ngx_destroy_pool(c->pool); + ngx_close_connection(c); +} + + +#if (NGX_HTTP_SSL) + +static ngx_int_t +ngx_tcp_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data) +{ + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; + + return kp->original_set_session(pc, kp->data); +} + + +static void +ngx_tcp_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data) +{ + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; + + kp->original_save_session(pc, kp->data); + return; +} + +#endif + + +static void * +ngx_tcp_upstream_keepalive_create_conf(ngx_conf_t *cf) +{ + ngx_tcp_upstream_keepalive_srv_conf_t *conf; + + conf = ngx_pcalloc(cf->pool, + sizeof(ngx_tcp_upstream_keepalive_srv_conf_t)); + if (conf == NULL) { + return NULL; + } + + /* + * set by ngx_pcalloc(): + * + * conf->original_init_upstream = NULL; + * conf->original_init_peer = NULL; + */ + + conf->max_cached = 1; + + return conf; +} + + +static char * +ngx_tcp_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_tcp_upstream_srv_conf_t *uscf; + ngx_tcp_upstream_keepalive_srv_conf_t *kcf = conf; + + ngx_int_t n; + ngx_str_t *value; + ngx_uint_t i; + + uscf = ngx_tcp_conf_get_module_srv_conf(cf, ngx_tcp_upstream_module); + + if (kcf->original_init_upstream) { + return "is duplicate"; + } + + kcf->original_init_upstream = uscf->peer.init_upstream + ? uscf->peer.init_upstream + : ngx_tcp_upstream_init_round_robin; + + uscf->peer.init_upstream = ngx_tcp_upstream_init_keepalive; + + /* read options */ + + value = cf->args->elts; + + n = ngx_atoi(value[1].data, value[1].len); + + if (n == NGX_ERROR || n == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid value \"%V\" in \"%V\" directive", + &value[1], &cmd->name); + return NGX_CONF_ERROR; + } + + kcf->max_cached = n; + + for (i = 2; i < cf->args->nelts; i++) { + + if (ngx_strcmp(value[i].data, "single") == 0) { + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "the \"single\" parameter is deprecated"); + continue; + } + + goto invalid; + } + + return NGX_CONF_OK; + +invalid: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[i]); + + return NGX_CONF_ERROR; +} diff --git a/ngx_tcp_upstream_keepalive.h b/ngx_tcp_upstream_keepalive.h new file mode 100644 index 0000000..9118e60 --- /dev/null +++ b/ngx_tcp_upstream_keepalive.h @@ -0,0 +1,50 @@ + +#ifndef _NGX_TCP_UPSTREAM_KEEPALIVE_H_INCLUDED_ +#define _NGX_TCP_UPSTREAM_KEEPALIVE_H_INCLUDED_ + +#include +#include +#include + +typedef struct { + ngx_uint_t max_cached; + + ngx_queue_t cache; + ngx_queue_t free; + + ngx_tcp_upstream_init_pt original_init_upstream; + ngx_tcp_upstream_init_peer_pt original_init_peer; + +} ngx_tcp_upstream_keepalive_srv_conf_t; + + +typedef struct { + ngx_tcp_upstream_keepalive_srv_conf_t *conf; + + ngx_tcp_upstream_t *upstream; + + void *data; + + ngx_event_get_peer_pt original_get_peer; + ngx_event_free_peer_pt original_free_peer; + +#if (NGX_HTTP_SSL) + ngx_event_set_peer_session_pt original_set_session; + ngx_event_save_peer_session_pt original_save_session; +#endif + +} ngx_tcp_upstream_keepalive_peer_data_t; + + +typedef struct { + ngx_tcp_upstream_keepalive_srv_conf_t *conf; + + ngx_queue_t queue; + ngx_connection_t *connection; + + socklen_t socklen; + u_char sockaddr[NGX_SOCKADDRLEN]; + +} ngx_tcp_upstream_keepalive_cache_t; + +#endif