From d3a4ad05625fed50d038c0c9aef521898bab5071 Mon Sep 17 00:00:00 2001 From: ideal Date: Wed, 16 Oct 2013 21:43:48 +0800 Subject: [PATCH 01/34] add keepalive file, not available currently --- config | 2 +- ngx_tcp_upstream_keepalive.c | 537 +++++++++++++++++++++++++++++++++++ 2 files changed, 538 insertions(+), 1 deletion(-) create mode 100644 ngx_tcp_upstream_keepalive.c diff --git a/config b/config index b661fc9..e333b66 100644 --- a/config +++ b/config @@ -4,7 +4,7 @@ ngx_feature_run=no ngx_feature_incs= ngx_feature_path="$ngx_addon_dir/modules $ngx_addon_dir/parsers $ngx_addon_dir" ngx_feature_deps="$ngx_addon_dir/ngx_tcp.h $ngx_addon_dir/ngx_tcp_session.h $ngx_addon_dir/ngx_tcp_upstream.h $ngx_addon_dir/ngx_tcp_upstream_check.h $ngx_addon_dir/ngx_tcp_upstream_round_robin.h" -ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c " +ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c $ngx_addon_dir/ngx_tcp_upstream_keepalive.c" ngx_tcp_ssl_deps="$ngx_addon_dir/modules/ngx_tcp_ssl_module.h" ngx_tcp_ssl_src="$ngx_addon_dir/modules/ngx_tcp_ssl_module.c" ngx_tcp_parser_deps="$ngx_addon_dir/parsers/parser.h $ngx_addon_dir/parsers/http_request_parser.h $ngx_addon_dir/parsers/http_response_parser.h $ngx_addon_dir/parsers/smtp_response_parser.h" diff --git a/ngx_tcp_upstream_keepalive.c b/ngx_tcp_upstream_keepalive.c new file mode 100644 index 0000000..375870a --- /dev/null +++ b/ngx_tcp_upstream_keepalive.c @@ -0,0 +1,537 @@ + +/* + * Copyright (C) Maxim Dounin + * Copyright (C) Nginx, Inc. + */ + + +#include +#include +#include +#include + + +typedef struct { + ngx_uint_t max_cached; + + ngx_queue_t cache; + ngx_queue_t free; + + ngx_http_upstream_init_pt original_init_upstream; + ngx_http_upstream_init_peer_pt original_init_peer; + +} ngx_http_upstream_keepalive_srv_conf_t; + + +typedef struct { + ngx_http_upstream_keepalive_srv_conf_t *conf; + + ngx_http_upstream_t *upstream; + + void *data; + + ngx_event_get_peer_pt original_get_peer; + ngx_event_free_peer_pt original_free_peer; + +#if (NGX_HTTP_SSL) + ngx_event_set_peer_session_pt original_set_session; + ngx_event_save_peer_session_pt original_save_session; +#endif + +} ngx_http_upstream_keepalive_peer_data_t; + + +typedef struct { + ngx_http_upstream_keepalive_srv_conf_t *conf; + + ngx_queue_t queue; + ngx_connection_t *connection; + + socklen_t socklen; + u_char sockaddr[NGX_SOCKADDRLEN]; + +} ngx_http_upstream_keepalive_cache_t; + + +static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, + ngx_http_upstream_srv_conf_t *us); +static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, + void *data); +static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, + void *data, ngx_uint_t state); + +static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev); +static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev); +static void ngx_http_upstream_keepalive_close(ngx_connection_t *c); + + +#if (NGX_HTTP_SSL) +static ngx_int_t ngx_http_upstream_keepalive_set_session( + ngx_peer_connection_t *pc, void *data); +static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, + void *data); +#endif + +static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf); +static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); + + +static ngx_command_t ngx_http_upstream_keepalive_commands[] = { + + { ngx_string("keepalive"), + NGX_HTTP_UPS_CONF|NGX_CONF_TAKE12, + ngx_http_upstream_keepalive, + NGX_HTTP_SRV_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_http_module_t ngx_tcp_upstream_keepalive_module_ctx = { + NULL, /* preconfiguration */ + NULL, /* postconfiguration */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_http_upstream_keepalive_create_conf, /* create server configuration */ + NULL, /* merge server configuration */ + + NULL, /* create location configuration */ + NULL /* merge location configuration */ +}; + + +ngx_module_t ngx_tcp_upstream_keepalive_module = { + NGX_MODULE_V1, + &ngx_tcp_upstream_keepalive_module_ctx, /* module context */ + ngx_http_upstream_keepalive_commands, /* module directives */ + NGX_TCP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_int_t +ngx_http_upstream_init_keepalive(ngx_conf_t *cf, + ngx_http_upstream_srv_conf_t *us) +{ + ngx_uint_t i; + ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_http_upstream_keepalive_cache_t *cached; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0, + "init keepalive"); + + kcf = ngx_http_conf_upstream_srv_conf(us, + ngx_tcp_upstream_keepalive_module); + + if (kcf->original_init_upstream(cf, us) != NGX_OK) { + return NGX_ERROR; + } + + kcf->original_init_peer = us->peer.init; + + us->peer.init = ngx_http_upstream_init_keepalive_peer; + + /* allocate cache items and add to free queue */ + + cached = ngx_pcalloc(cf->pool, + sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached); + if (cached == NULL) { + return NGX_ERROR; + } + + ngx_queue_init(&kcf->cache); + ngx_queue_init(&kcf->free); + + for (i = 0; i < kcf->max_cached; i++) { + ngx_queue_insert_head(&kcf->free, &cached[i].queue); + cached[i].conf = kcf; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, + ngx_http_upstream_srv_conf_t *us) +{ + ngx_http_upstream_keepalive_peer_data_t *kp; + ngx_http_upstream_keepalive_srv_conf_t *kcf; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, + "init keepalive peer"); + + kcf = ngx_http_conf_upstream_srv_conf(us, + ngx_tcp_upstream_keepalive_module); + + kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t)); + if (kp == NULL) { + return NGX_ERROR; + } + + if (kcf->original_init_peer(r, us) != NGX_OK) { + return NGX_ERROR; + } + + kp->conf = kcf; + kp->upstream = r->upstream; + kp->data = r->upstream->peer.data; + kp->original_get_peer = r->upstream->peer.get; + kp->original_free_peer = r->upstream->peer.free; + + r->upstream->peer.data = kp; + r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer; + r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer; + +#if (NGX_HTTP_SSL) + kp->original_set_session = r->upstream->peer.set_session; + kp->original_save_session = r->upstream->peer.save_session; + r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session; + r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session; +#endif + + return NGX_OK; +} + + +static ngx_int_t +ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_http_upstream_keepalive_cache_t *item; + + ngx_int_t rc; + ngx_queue_t *q, *cache; + ngx_connection_t *c; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer"); + + /* ask balancer */ + + rc = kp->original_get_peer(pc, kp->data); + + if (rc != NGX_OK) { + return rc; + } + + /* search cache for suitable connection */ + + cache = &kp->conf->cache; + + for (q = ngx_queue_head(cache); + q != ngx_queue_sentinel(cache); + q = ngx_queue_next(q)) + { + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + c = item->connection; + + if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr, + item->socklen, pc->socklen) + == 0) + { + ngx_queue_remove(q); + ngx_queue_insert_head(&kp->conf->free, q); + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "get keepalive peer: using connection %p", c); + + c->idle = 0; + c->log = pc->log; + c->read->log = pc->log; + c->write->log = pc->log; + c->pool->log = pc->log; + + pc->connection = c; + pc->cached = 1; + + return NGX_DONE; + } + } + + return NGX_OK; +} + + +static void +ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, + ngx_uint_t state) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_http_upstream_keepalive_cache_t *item; + + ngx_queue_t *q; + ngx_connection_t *c; + ngx_http_upstream_t *u; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer"); + + /* cache valid connections */ + + u = kp->upstream; + c = pc->connection; + + if (state & NGX_PEER_FAILED + || c == NULL + || c->read->eof + || c->read->error + || c->read->timedout + || c->write->error + || c->write->timedout) + { + goto invalid; + } + + if (!u->keepalive) { + goto invalid; + } + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto invalid; + } + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "free keepalive peer: saving connection %p", c); + + if (ngx_queue_empty(&kp->conf->free)) { + + q = ngx_queue_last(&kp->conf->cache); + ngx_queue_remove(q); + + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + + ngx_http_upstream_keepalive_close(item->connection); + + } else { + q = ngx_queue_head(&kp->conf->free); + ngx_queue_remove(q); + + item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + } + + item->connection = c; + ngx_queue_insert_head(&kp->conf->cache, q); + + pc->connection = NULL; + + if (c->read->timer_set) { + ngx_del_timer(c->read); + } + if (c->write->timer_set) { + ngx_del_timer(c->write); + } + + c->write->handler = ngx_http_upstream_keepalive_dummy_handler; + c->read->handler = ngx_http_upstream_keepalive_close_handler; + + c->data = item; + c->idle = 1; + c->log = ngx_cycle->log; + c->read->log = ngx_cycle->log; + c->write->log = ngx_cycle->log; + c->pool->log = ngx_cycle->log; + + item->socklen = pc->socklen; + ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); + + if (c->read->ready) { + ngx_http_upstream_keepalive_close_handler(c->read); + } + +invalid: + + kp->original_free_peer(pc, kp->data, state); +} + + +static void +ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev) +{ + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive dummy handler"); +} + + +static void +ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev) +{ + ngx_http_upstream_keepalive_srv_conf_t *conf; + ngx_http_upstream_keepalive_cache_t *item; + + int n; + char buf[1]; + ngx_connection_t *c; + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, + "keepalive close handler"); + + c = ev->data; + + if (c->close) { + goto close; + } + + n = recv(c->fd, buf, 1, MSG_PEEK); + + if (n == -1 && ngx_socket_errno == NGX_EAGAIN) { + /* stale event */ + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + goto close; + } + + return; + } + +close: + + item = c->data; + conf = item->conf; + + ngx_http_upstream_keepalive_close(c); + + ngx_queue_remove(&item->queue); + ngx_queue_insert_head(&conf->free, &item->queue); +} + + +static void +ngx_http_upstream_keepalive_close(ngx_connection_t *c) +{ + +#if (NGX_HTTP_SSL) + + if (c->ssl) { + c->ssl->no_wait_shutdown = 1; + c->ssl->no_send_shutdown = 1; + + if (ngx_ssl_shutdown(c) == NGX_AGAIN) { + c->ssl->handler = ngx_http_upstream_keepalive_close; + return; + } + } + +#endif + + ngx_destroy_pool(c->pool); + ngx_close_connection(c); +} + + +#if (NGX_HTTP_SSL) + +static ngx_int_t +ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + + return kp->original_set_session(pc, kp->data); +} + + +static void +ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data) +{ + ngx_http_upstream_keepalive_peer_data_t *kp = data; + + kp->original_save_session(pc, kp->data); + return; +} + +#endif + + +static void * +ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf) +{ + ngx_http_upstream_keepalive_srv_conf_t *conf; + + conf = ngx_pcalloc(cf->pool, + sizeof(ngx_http_upstream_keepalive_srv_conf_t)); + if (conf == NULL) { + return NULL; + } + + /* + * set by ngx_pcalloc(): + * + * conf->original_init_upstream = NULL; + * conf->original_init_peer = NULL; + */ + + conf->max_cached = 1; + + return conf; +} + + +static char * +ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_http_upstream_srv_conf_t *uscf; + ngx_http_upstream_keepalive_srv_conf_t *kcf = conf; + + ngx_int_t n; + ngx_str_t *value; + ngx_uint_t i; + + uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); + + if (kcf->original_init_upstream) { + return "is duplicate"; + } + + kcf->original_init_upstream = uscf->peer.init_upstream + ? uscf->peer.init_upstream + : ngx_http_upstream_init_round_robin; + + uscf->peer.init_upstream = ngx_http_upstream_init_keepalive; + + /* read options */ + + value = cf->args->elts; + + n = ngx_atoi(value[1].data, value[1].len); + + if (n == NGX_ERROR || n == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid value \"%V\" in \"%V\" directive", + &value[1], &cmd->name); + return NGX_CONF_ERROR; + } + + kcf->max_cached = n; + + for (i = 2; i < cf->args->nelts; i++) { + + if (ngx_strcmp(value[i].data, "single") == 0) { + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "the \"single\" parameter is deprecated"); + continue; + } + + goto invalid; + } + + return NGX_CONF_OK; + +invalid: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[i]); + + return NGX_CONF_ERROR; +} From acedd90fea72ad6c994a4423827709808dc5a887 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 17:33:14 +0800 Subject: [PATCH 02/34] add file --- config | 2 +- ngx_tcp_upstream_keepalive.c | 51 +----------------------------------- ngx_tcp_upstream_keepalive.h | 50 +++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 51 deletions(-) create mode 100644 ngx_tcp_upstream_keepalive.h diff --git a/config b/config index e333b66..2fd811f 100644 --- a/config +++ b/config @@ -3,7 +3,7 @@ ngx_feature_name= ngx_feature_run=no ngx_feature_incs= ngx_feature_path="$ngx_addon_dir/modules $ngx_addon_dir/parsers $ngx_addon_dir" -ngx_feature_deps="$ngx_addon_dir/ngx_tcp.h $ngx_addon_dir/ngx_tcp_session.h $ngx_addon_dir/ngx_tcp_upstream.h $ngx_addon_dir/ngx_tcp_upstream_check.h $ngx_addon_dir/ngx_tcp_upstream_round_robin.h" +ngx_feature_deps="$ngx_addon_dir/ngx_tcp.h $ngx_addon_dir/ngx_tcp_session.h $ngx_addon_dir/ngx_tcp_upstream.h $ngx_addon_dir/ngx_tcp_upstream_check.h $ngx_addon_dir/ngx_tcp_upstream_round_robin.h $ngx_addon_dir/ngx_tcp_upstream_keepalive.h" ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c $ngx_addon_dir/ngx_tcp_upstream_keepalive.c" ngx_tcp_ssl_deps="$ngx_addon_dir/modules/ngx_tcp_ssl_module.h" ngx_tcp_ssl_src="$ngx_addon_dir/modules/ngx_tcp_ssl_module.c" diff --git a/ngx_tcp_upstream_keepalive.c b/ngx_tcp_upstream_keepalive.c index 375870a..d32f552 100644 --- a/ngx_tcp_upstream_keepalive.c +++ b/ngx_tcp_upstream_keepalive.c @@ -1,57 +1,8 @@ -/* - * Copyright (C) Maxim Dounin - * Copyright (C) Nginx, Inc. - */ - - #include #include -#include #include - - -typedef struct { - ngx_uint_t max_cached; - - ngx_queue_t cache; - ngx_queue_t free; - - ngx_http_upstream_init_pt original_init_upstream; - ngx_http_upstream_init_peer_pt original_init_peer; - -} ngx_http_upstream_keepalive_srv_conf_t; - - -typedef struct { - ngx_http_upstream_keepalive_srv_conf_t *conf; - - ngx_http_upstream_t *upstream; - - void *data; - - ngx_event_get_peer_pt original_get_peer; - ngx_event_free_peer_pt original_free_peer; - -#if (NGX_HTTP_SSL) - ngx_event_set_peer_session_pt original_set_session; - ngx_event_save_peer_session_pt original_save_session; -#endif - -} ngx_http_upstream_keepalive_peer_data_t; - - -typedef struct { - ngx_http_upstream_keepalive_srv_conf_t *conf; - - ngx_queue_t queue; - ngx_connection_t *connection; - - socklen_t socklen; - u_char sockaddr[NGX_SOCKADDRLEN]; - -} ngx_http_upstream_keepalive_cache_t; - +#include static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us); diff --git a/ngx_tcp_upstream_keepalive.h b/ngx_tcp_upstream_keepalive.h new file mode 100644 index 0000000..4759ac9 --- /dev/null +++ b/ngx_tcp_upstream_keepalive.h @@ -0,0 +1,50 @@ + +#ifndef _NGX_TCP_UPSTREAM_KEEPALIVE_H_INCLUDED_ +#define _NGX_TCP_UPSTREAM_KEEPALIVE_H_INCLUDED_ + +#include +#include +#include + +typedef struct { + ngx_uint_t max_cached; + + ngx_queue_t cache; + ngx_queue_t free; + + ngx_tcp_upstream_init_pt original_init_upstream; + ngx_tcp_upstream_init_peer_pt original_init_peer; + +} ngx_tcp_upstream_keepalive_srv_conf_t; + + +typedef struct { + ngx_tcp_upstream_keepalive_srv_conf_t *conf; + + ngx_tcp_upstream_t *upstream; + + void *data; + + ngx_event_get_peer_pt original_get_peer; + ngx_event_free_peer_pt original_free_peer; + +#if (NGX_HTTP_SSL) + ngx_event_set_peer_session_pt original_set_session; + ngx_event_save_peer_session_pt original_save_session; +#endif + +} ngx_tcp_upstream_keepalive_peer_data_t; + + +typedef struct { + ngx_http_upstream_keepalive_srv_conf_t *conf; + + ngx_queue_t queue; + ngx_connection_t *connection; + + socklen_t socklen; + u_char sockaddr[NGX_SOCKADDRLEN]; + +} ngx_http_upstream_keepalive_cache_t; + +#endif From b28f3365cad93fa6df5fa52bf260822e28b10df8 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 17:34:38 +0800 Subject: [PATCH 03/34] modify gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 27fbb5a..4a8bfc6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ Makefile cscope* objs +tags From 43c120b0c06cc85c0976bfbe160ab290e71699b2 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 18:26:53 +0800 Subject: [PATCH 04/34] change tcp --- ngx_tcp_upstream_keepalive.c | 176 +++++++++++++++++------------------ ngx_tcp_upstream_keepalive.h | 4 +- 2 files changed, 87 insertions(+), 93 deletions(-) diff --git a/ngx_tcp_upstream_keepalive.c b/ngx_tcp_upstream_keepalive.c index d32f552..74367a4 100644 --- a/ngx_tcp_upstream_keepalive.c +++ b/ngx_tcp_upstream_keepalive.c @@ -4,36 +4,36 @@ #include #include -static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, - ngx_http_upstream_srv_conf_t *us); -static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, - void *data); -static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, - void *data, ngx_uint_t state); +static ngx_int_t ngx_tcp_upstream_init_keepalive_peer(ngx_tcp_session_t *r, + ngx_tcp_upstream_srv_conf_t *us); +static ngx_int_t ngx_tcp_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, + void *data); +static void ngx_tcp_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, + void *data, ngx_uint_t state); -static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev); -static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev); -static void ngx_http_upstream_keepalive_close(ngx_connection_t *c); +static void ngx_tcp_upstream_keepalive_dummy_handler(ngx_event_t *ev); +static void ngx_tcp_upstream_keepalive_close_handler(ngx_event_t *ev); +static void ngx_tcp_upstream_keepalive_close(ngx_connection_t *c); #if (NGX_HTTP_SSL) -static ngx_int_t ngx_http_upstream_keepalive_set_session( +static ngx_int_t ngx_tcp_upstream_keepalive_set_session( ngx_peer_connection_t *pc, void *data); -static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, +static void ngx_tcp_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data); #endif -static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf); -static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, +static void *ngx_tcp_upstream_keepalive_create_conf(ngx_conf_t *cf); +static char *ngx_tcp_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); -static ngx_command_t ngx_http_upstream_keepalive_commands[] = { +static ngx_command_t ngx_tcp_upstream_keepalive_commands[] = { { ngx_string("keepalive"), - NGX_HTTP_UPS_CONF|NGX_CONF_TAKE12, - ngx_http_upstream_keepalive, - NGX_HTTP_SRV_CONF_OFFSET, + NGX_TCP_UPS_CONF|NGX_CONF_TAKE12, + ngx_tcp_upstream_keepalive, + NGX_TCP_SRV_CONF_OFFSET, 0, NULL }, @@ -41,50 +41,44 @@ static ngx_command_t ngx_http_upstream_keepalive_commands[] = { }; -static ngx_http_module_t ngx_tcp_upstream_keepalive_module_ctx = { - NULL, /* preconfiguration */ - NULL, /* postconfiguration */ - - NULL, /* create main configuration */ - NULL, /* init main configuration */ - - ngx_http_upstream_keepalive_create_conf, /* create server configuration */ - NULL, /* merge server configuration */ - - NULL, /* create location configuration */ - NULL /* merge location configuration */ +static ngx_tcp_module_t ngx_tcp_upstream_keepalive_module_ctx = { + NULL, + ngx_tcp_upstream_keepalive_create_conf, + NULL, + NULL, + NULL }; ngx_module_t ngx_tcp_upstream_keepalive_module = { NGX_MODULE_V1, - &ngx_tcp_upstream_keepalive_module_ctx, /* module context */ - ngx_http_upstream_keepalive_commands, /* module directives */ - NGX_TCP_MODULE, /* module type */ - NULL, /* init master */ - NULL, /* init module */ - NULL, /* init process */ - NULL, /* init thread */ - NULL, /* exit thread */ - NULL, /* exit process */ - NULL, /* exit master */ + &ngx_tcp_upstream_keepalive_module_ctx, /* module context */ + ngx_tcp_upstream_keepalive_commands, /* module directives */ + NGX_TCP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ NGX_MODULE_V1_PADDING }; static ngx_int_t -ngx_http_upstream_init_keepalive(ngx_conf_t *cf, - ngx_http_upstream_srv_conf_t *us) +ngx_tcp_upstream_init_keepalive(ngx_conf_t *cf, + ngx_tcp_upstream_srv_conf_t *us) { ngx_uint_t i; - ngx_http_upstream_keepalive_srv_conf_t *kcf; - ngx_http_upstream_keepalive_cache_t *cached; + ngx_tcp_upstream_keepalive_srv_conf_t *kcf; + ngx_tcp_upstream_keepalive_cache_t *cached; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0, "init keepalive"); - kcf = ngx_http_conf_upstream_srv_conf(us, - ngx_tcp_upstream_keepalive_module); + kcf = ngx_tcp_conf_upstream_srv_conf(us, + ngx_tcp_upstream_keepalive_module); if (kcf->original_init_upstream(cf, us) != NGX_OK) { return NGX_ERROR; @@ -92,12 +86,12 @@ ngx_http_upstream_init_keepalive(ngx_conf_t *cf, kcf->original_init_peer = us->peer.init; - us->peer.init = ngx_http_upstream_init_keepalive_peer; + us->peer.init = ngx_tcp_upstream_init_keepalive_peer; /* allocate cache items and add to free queue */ cached = ngx_pcalloc(cf->pool, - sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached); + sizeof(ngx_tcp_upstream_keepalive_cache_t) * kcf->max_cached); if (cached == NULL) { return NGX_ERROR; } @@ -115,19 +109,19 @@ ngx_http_upstream_init_keepalive(ngx_conf_t *cf, static ngx_int_t -ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, - ngx_http_upstream_srv_conf_t *us) +ngx_tcp_upstream_init_keepalive_peer(ngx_tcp_session_t *r, + ngx_tcp_upstream_srv_conf_t *us) { - ngx_http_upstream_keepalive_peer_data_t *kp; - ngx_http_upstream_keepalive_srv_conf_t *kcf; + ngx_tcp_upstream_keepalive_peer_data_t *kp; + ngx_tcp_upstream_keepalive_srv_conf_t *kcf; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "init keepalive peer"); - kcf = ngx_http_conf_upstream_srv_conf(us, + kcf = ngx_tcp_conf_upstream_srv_conf(us, ngx_tcp_upstream_keepalive_module); - kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t)); + kp = ngx_palloc(r->pool, sizeof(ngx_tcp_upstream_keepalive_peer_data_t)); if (kp == NULL) { return NGX_ERROR; } @@ -143,14 +137,14 @@ ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, kp->original_free_peer = r->upstream->peer.free; r->upstream->peer.data = kp; - r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer; - r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer; + r->upstream->peer.get = ngx_tcp_upstream_get_keepalive_peer; + r->upstream->peer.free = ngx_tcp_upstream_free_keepalive_peer; #if (NGX_HTTP_SSL) kp->original_set_session = r->upstream->peer.set_session; kp->original_save_session = r->upstream->peer.save_session; - r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session; - r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session; + r->upstream->peer.set_session = ngx_tcp_upstream_keepalive_set_session; + r->upstream->peer.save_session = ngx_tcp_upstream_keepalive_save_session; #endif return NGX_OK; @@ -158,10 +152,10 @@ ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, static ngx_int_t -ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) +ngx_tcp_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) { - ngx_http_upstream_keepalive_peer_data_t *kp = data; - ngx_http_upstream_keepalive_cache_t *item; + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; + ngx_tcp_upstream_keepalive_cache_t *item; ngx_int_t rc; ngx_queue_t *q, *cache; @@ -186,7 +180,7 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) q != ngx_queue_sentinel(cache); q = ngx_queue_next(q)) { - item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + item = ngx_queue_data(q, ngx_tcp_upstream_keepalive_cache_t, queue); c = item->connection; if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr, @@ -217,15 +211,15 @@ ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) static void -ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, +ngx_tcp_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, ngx_uint_t state) { - ngx_http_upstream_keepalive_peer_data_t *kp = data; - ngx_http_upstream_keepalive_cache_t *item; + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; + ngx_tcp_upstream_keepalive_cache_t *item; ngx_queue_t *q; ngx_connection_t *c; - ngx_http_upstream_t *u; + ngx_tcp_upstream_t *u; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "free keepalive peer"); @@ -262,15 +256,15 @@ ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, q = ngx_queue_last(&kp->conf->cache); ngx_queue_remove(q); - item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + item = ngx_queue_data(q, ngx_tcp_upstream_keepalive_cache_t, queue); - ngx_http_upstream_keepalive_close(item->connection); + ngx_tcp_upstream_keepalive_close(item->connection); } else { q = ngx_queue_head(&kp->conf->free); ngx_queue_remove(q); - item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); + item = ngx_queue_data(q, ngx_tcp_upstream_keepalive_cache_t, queue); } item->connection = c; @@ -285,8 +279,8 @@ ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, ngx_del_timer(c->write); } - c->write->handler = ngx_http_upstream_keepalive_dummy_handler; - c->read->handler = ngx_http_upstream_keepalive_close_handler; + c->write->handler = ngx_tcp_upstream_keepalive_dummy_handler; + c->read->handler = ngx_tcp_upstream_keepalive_close_handler; c->data = item; c->idle = 1; @@ -299,7 +293,7 @@ ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); if (c->read->ready) { - ngx_http_upstream_keepalive_close_handler(c->read); + ngx_tcp_upstream_keepalive_close_handler(c->read); } invalid: @@ -309,7 +303,7 @@ ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, static void -ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev) +ngx_tcp_upstream_keepalive_dummy_handler(ngx_event_t *ev) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, "keepalive dummy handler"); @@ -317,10 +311,10 @@ ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev) static void -ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev) +ngx_tcp_upstream_keepalive_close_handler(ngx_event_t *ev) { - ngx_http_upstream_keepalive_srv_conf_t *conf; - ngx_http_upstream_keepalive_cache_t *item; + ngx_tcp_upstream_keepalive_srv_conf_t *conf; + ngx_tcp_upstream_keepalive_cache_t *item; int n; char buf[1]; @@ -352,7 +346,7 @@ ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev) item = c->data; conf = item->conf; - ngx_http_upstream_keepalive_close(c); + ngx_tcp_upstream_keepalive_close(c); ngx_queue_remove(&item->queue); ngx_queue_insert_head(&conf->free, &item->queue); @@ -360,7 +354,7 @@ ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev) static void -ngx_http_upstream_keepalive_close(ngx_connection_t *c) +ngx_tcp_upstream_keepalive_close(ngx_connection_t *c) { #if (NGX_HTTP_SSL) @@ -370,7 +364,7 @@ ngx_http_upstream_keepalive_close(ngx_connection_t *c) c->ssl->no_send_shutdown = 1; if (ngx_ssl_shutdown(c) == NGX_AGAIN) { - c->ssl->handler = ngx_http_upstream_keepalive_close; + c->ssl->handler = ngx_tcp_upstream_keepalive_close; return; } } @@ -385,18 +379,18 @@ ngx_http_upstream_keepalive_close(ngx_connection_t *c) #if (NGX_HTTP_SSL) static ngx_int_t -ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data) +ngx_tcp_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data) { - ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; return kp->original_set_session(pc, kp->data); } static void -ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data) +ngx_tcp_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data) { - ngx_http_upstream_keepalive_peer_data_t *kp = data; + ngx_tcp_upstream_keepalive_peer_data_t *kp = data; kp->original_save_session(pc, kp->data); return; @@ -406,12 +400,12 @@ ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data) static void * -ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf) +ngx_tcp_upstream_keepalive_create_conf(ngx_conf_t *cf) { - ngx_http_upstream_keepalive_srv_conf_t *conf; + ngx_tcp_upstream_keepalive_srv_conf_t *conf; conf = ngx_pcalloc(cf->pool, - sizeof(ngx_http_upstream_keepalive_srv_conf_t)); + sizeof(ngx_tcp_upstream_keepalive_srv_conf_t)); if (conf == NULL) { return NULL; } @@ -430,16 +424,16 @@ ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf) static char * -ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +ngx_tcp_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { - ngx_http_upstream_srv_conf_t *uscf; - ngx_http_upstream_keepalive_srv_conf_t *kcf = conf; + ngx_tcp_upstream_srv_conf_t *uscf; + ngx_tcp_upstream_keepalive_srv_conf_t *kcf = conf; ngx_int_t n; ngx_str_t *value; ngx_uint_t i; - uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); + uscf = ngx_tcp_conf_get_module_srv_conf(cf, ngx_tcp_upstream_module); if (kcf->original_init_upstream) { return "is duplicate"; @@ -447,9 +441,9 @@ ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) kcf->original_init_upstream = uscf->peer.init_upstream ? uscf->peer.init_upstream - : ngx_http_upstream_init_round_robin; + : ngx_tcp_upstream_init_round_robin; - uscf->peer.init_upstream = ngx_http_upstream_init_keepalive; + uscf->peer.init_upstream = ngx_tcp_upstream_init_keepalive; /* read options */ diff --git a/ngx_tcp_upstream_keepalive.h b/ngx_tcp_upstream_keepalive.h index 4759ac9..9118e60 100644 --- a/ngx_tcp_upstream_keepalive.h +++ b/ngx_tcp_upstream_keepalive.h @@ -37,7 +37,7 @@ typedef struct { typedef struct { - ngx_http_upstream_keepalive_srv_conf_t *conf; + ngx_tcp_upstream_keepalive_srv_conf_t *conf; ngx_queue_t queue; ngx_connection_t *connection; @@ -45,6 +45,6 @@ typedef struct { socklen_t socklen; u_char sockaddr[NGX_SOCKADDRLEN]; -} ngx_http_upstream_keepalive_cache_t; +} ngx_tcp_upstream_keepalive_cache_t; #endif From 8f8cf80229923264a86672de138ac6a56ca4bd51 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 18:32:48 +0800 Subject: [PATCH 05/34] change tcp --- ngx_tcp_upstream.c | 6 ++++-- ngx_tcp_upstream.h | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/ngx_tcp_upstream.c b/ngx_tcp_upstream.c index 2d1b419..c395f38 100644 --- a/ngx_tcp_upstream.c +++ b/ngx_tcp_upstream.c @@ -183,8 +183,10 @@ ngx_tcp_upstream_init(ngx_tcp_session_t *s) cln = ngx_tcp_cleanup_add(s, 0); cln->handler = ngx_tcp_upstream_cleanup; - cln->data = s; - u->cleanup = &cln->handler; + cln->data = s; + u->cleanup = &cln->handler; + + u->keepalive = 1 if (u->resolved == NULL) { diff --git a/ngx_tcp_upstream.h b/ngx_tcp_upstream.h index 4fdce67..f9a832b 100644 --- a/ngx_tcp_upstream.h +++ b/ngx_tcp_upstream.h @@ -160,6 +160,8 @@ struct ngx_tcp_upstream_s { ngx_tcp_upstream_resolved_t *resolved; ngx_tcp_upstream_state_t *state; ngx_tcp_cleanup_pt *cleanup; + + unsigned keepalive:1; }; From 5f7605623eb0d4d58eeb98dcf61f7ba6e681a287 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 18:33:30 +0800 Subject: [PATCH 06/34] =?UTF-8?q?=E5=9B=A7=EF=BC=8C=E5=B0=91=E4=BA=86?= =?UTF-8?q?=E4=B8=AA=E5=88=86=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ngx_tcp_upstream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngx_tcp_upstream.c b/ngx_tcp_upstream.c index c395f38..0ceb0ef 100644 --- a/ngx_tcp_upstream.c +++ b/ngx_tcp_upstream.c @@ -186,7 +186,7 @@ ngx_tcp_upstream_init(ngx_tcp_session_t *s) cln->data = s; u->cleanup = &cln->handler; - u->keepalive = 1 + u->keepalive = 1; if (u->resolved == NULL) { From a0ac70e4ce96fa48bf8ec15fc6f2014b32a50e5b Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 18:40:21 +0800 Subject: [PATCH 07/34] ngx_tcp_upstream_keepalive_create_conf should be at create_srv_conf --- ngx_tcp_upstream_keepalive.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngx_tcp_upstream_keepalive.c b/ngx_tcp_upstream_keepalive.c index 74367a4..c220540 100644 --- a/ngx_tcp_upstream_keepalive.c +++ b/ngx_tcp_upstream_keepalive.c @@ -43,9 +43,9 @@ static ngx_command_t ngx_tcp_upstream_keepalive_commands[] = { static ngx_tcp_module_t ngx_tcp_upstream_keepalive_module_ctx = { NULL, - ngx_tcp_upstream_keepalive_create_conf, NULL, NULL, + ngx_tcp_upstream_keepalive_create_conf, NULL }; From eabc02cc133c71132f85fa6823b64deb2b8d081a Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 20:15:50 +0800 Subject: [PATCH 08/34] add module to config --- config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config b/config index 2fd811f..2ac7136 100644 --- a/config +++ b/config @@ -17,7 +17,7 @@ if [ $ngx_found = yes ]; then ngx_addon_name=ngx_tcp_module TCP_CORE_MODULES="ngx_tcp_module ngx_tcp_core_module ngx_tcp_upstream_module" - TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module" + TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module ngx_tcp_upstream_keepalive_module" NGX_ADDON_DEPS="$NGX_ADDON_DEPS $ngx_feature_deps $ngx_tcp_parser_deps" NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_tcp_src $ngx_tcp_parser_src" From fcfa12713398022ca1076575cba6a395f230939d Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 20:41:05 +0800 Subject: [PATCH 09/34] remove eof --- ngx_tcp_upstream_keepalive.c | 1 - 1 file changed, 1 deletion(-) diff --git a/ngx_tcp_upstream_keepalive.c b/ngx_tcp_upstream_keepalive.c index c220540..aa8ee13 100644 --- a/ngx_tcp_upstream_keepalive.c +++ b/ngx_tcp_upstream_keepalive.c @@ -231,7 +231,6 @@ ngx_tcp_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, if (state & NGX_PEER_FAILED || c == NULL - || c->read->eof || c->read->error || c->read->timedout || c->write->error From 213c7a08bad7c4af8a09e5fcba9942d99ddaf437 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 22:10:46 +0800 Subject: [PATCH 10/34] set upstream to keepalive in ngx_tcp_upstream_keepalive.c --- ngx_tcp_upstream.c | 4 ++-- ngx_tcp_upstream_keepalive.c | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ngx_tcp_upstream.c b/ngx_tcp_upstream.c index 0ceb0ef..0250390 100644 --- a/ngx_tcp_upstream.c +++ b/ngx_tcp_upstream.c @@ -186,7 +186,7 @@ ngx_tcp_upstream_init(ngx_tcp_session_t *s) cln->data = s; u->cleanup = &cln->handler; - u->keepalive = 1; + u->keepalive = 0; if (u->resolved == NULL) { @@ -543,7 +543,7 @@ ngx_tcp_upstream_finalize_session(ngx_tcp_session_t *s, if (u->cleanup) { *u->cleanup = NULL; - u->cleanup = NULL; + u->cleanup = NULL; } if (u->state && u->state->response_sec) { diff --git a/ngx_tcp_upstream_keepalive.c b/ngx_tcp_upstream_keepalive.c index aa8ee13..21a61e7 100644 --- a/ngx_tcp_upstream_keepalive.c +++ b/ngx_tcp_upstream_keepalive.c @@ -70,7 +70,7 @@ static ngx_int_t ngx_tcp_upstream_init_keepalive(ngx_conf_t *cf, ngx_tcp_upstream_srv_conf_t *us) { - ngx_uint_t i; + ngx_uint_t i; ngx_tcp_upstream_keepalive_srv_conf_t *kcf; ngx_tcp_upstream_keepalive_cache_t *cached; @@ -140,6 +140,8 @@ ngx_tcp_upstream_init_keepalive_peer(ngx_tcp_session_t *r, r->upstream->peer.get = ngx_tcp_upstream_get_keepalive_peer; r->upstream->peer.free = ngx_tcp_upstream_free_keepalive_peer; + r->upstream->keepalive = 1; + #if (NGX_HTTP_SSL) kp->original_set_session = r->upstream->peer.set_session; kp->original_save_session = r->upstream->peer.save_session; From 382fa34e48c63340d7c0a8ba0a7eb9f6800b6aa6 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 17 Oct 2013 22:27:33 +0800 Subject: [PATCH 11/34] do not close connection if keepalive --- ngx_tcp_upstream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ngx_tcp_upstream.c b/ngx_tcp_upstream.c index 0250390..a9d125e 100644 --- a/ngx_tcp_upstream.c +++ b/ngx_tcp_upstream.c @@ -561,7 +561,7 @@ ngx_tcp_upstream_finalize_session(ngx_tcp_session_t *s, u->peer.check_index = NGX_INVALID_CHECK_INDEX; } - if (u->peer.connection) { + if (u->peer.connection && !u->keepalive) { ngx_log_debug1(NGX_LOG_DEBUG_TCP, s->connection->log, 0, "close tcp upstream connection: %d", From f9ba862cb0abe3a4440f137334cfb09dabf2375f Mon Sep 17 00:00:00 2001 From: ideal Date: Fri, 18 Oct 2013 16:46:49 +0800 Subject: [PATCH 12/34] ugly ignore quit --- modules/ngx_tcp_generic_proxy_module.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/modules/ngx_tcp_generic_proxy_module.c b/modules/ngx_tcp_generic_proxy_module.c index d2a81a9..9e4360a 100644 --- a/modules/ngx_tcp_generic_proxy_module.c +++ b/modules/ngx_tcp_generic_proxy_module.c @@ -398,6 +398,15 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) if (size && dst->write->ready) { c->log->action = send_action; + /* TODO: move to somewhere */ + if (ngx_strncmp("quit", b->pos, 4) == 0) { + ngx_log_debug0(NGX_LOG_DEBUG_TCP, ev->log, 0, + "received quit, close session"); + + ngx_tcp_finalize_session(s); + return; + } + n = dst->send(dst, b->pos, size); err = ngx_socket_errno; From fbef10fa34f7fb2ce828a80cf88ce851e375670b Mon Sep 17 00:00:00 2001 From: ideal Date: Mon, 21 Oct 2013 10:46:27 +0800 Subject: [PATCH 13/34] change error handling when connecting to a tcp upstream --- ngx_tcp_upstream.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/ngx_tcp_upstream.c b/ngx_tcp_upstream.c index a9d125e..c9bc1e8 100644 --- a/ngx_tcp_upstream.c +++ b/ngx_tcp_upstream.c @@ -341,7 +341,7 @@ ngx_tcp_upstream_connect(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) ngx_log_debug1(NGX_LOG_DEBUG_TCP, s->connection->log, 0, "tcp upstream connect: %d", rc); - if (rc != NGX_OK && rc != NGX_AGAIN) { + if (rc == NGX_ERROR) { ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, "upstream servers are busy or encounter error!"); @@ -352,7 +352,19 @@ ngx_tcp_upstream_connect(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) return; } - /* rc == NGX_OK or rc == NGX_AGAIN */ + if (rc == NGX_BUSY) { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "no live upstreams"); + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_NOLIVE); + return; + } + + if (rc == NGX_DECLINED) { + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_ERROR); + return; + } + + /* rc == NGX_OK or rc == NGX_AGAIN or rc == NGX_DONE */ if (u->peer.check_index != NGX_INVALID_CHECK_INDEX) { ngx_tcp_check_get_peer(u->peer.check_index); From f924ffd52e7bea039a93bf5c14b0754ee404a212 Mon Sep 17 00:00:00 2001 From: ideal Date: Mon, 21 Oct 2013 16:34:16 +0800 Subject: [PATCH 14/34] add a server_type directive to specify the upstream type --- modules/ngx_tcp_generic_proxy_module.c | 67 ++++++++++++++++++-------- ngx_tcp_upstream.c | 8 +++ ngx_tcp_upstream.h | 2 + 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/modules/ngx_tcp_generic_proxy_module.c b/modules/ngx_tcp_generic_proxy_module.c index 9e4360a..a65ed8b 100644 --- a/modules/ngx_tcp_generic_proxy_module.c +++ b/modules/ngx_tcp_generic_proxy_module.c @@ -32,6 +32,16 @@ static void *ngx_tcp_proxy_create_conf(ngx_conf_t *cf); static char *ngx_tcp_proxy_merge_conf(ngx_conf_t *cf, void *parent, void *child); +static ngx_keyval_t ngx_tcp_server_types[] = { + + { ngx_string("redis"), + ngx_string("quit") + }, + + { ngx_null_string, + ngx_null_string + } +}; static ngx_tcp_protocol_t ngx_tcp_generic_protocol = { @@ -301,18 +311,19 @@ ngx_tcp_upstream_init_proxy_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) static void ngx_tcp_proxy_handler(ngx_event_t *ev) { - char *action, *recv_action, *send_action; - off_t *read_bytes, *write_bytes; - size_t size; - ssize_t n; - ngx_buf_t *b; - ngx_err_t err; - ngx_uint_t do_write, first_read; - ngx_connection_t *c, *src, *dst; - ngx_tcp_session_t *s; - ngx_tcp_proxy_conf_t *pcf; - ngx_tcp_proxy_ctx_t *pctx; - ngx_tcp_core_srv_conf_t *cscf; + char *action, *recv_action, *send_action; + off_t *read_bytes, *write_bytes; + size_t size; + ssize_t n; + ngx_buf_t *b; + ngx_err_t err; + ngx_uint_t do_write, first_read, i; + ngx_connection_t *c, *src, *dst; + ngx_tcp_session_t *s; + ngx_tcp_proxy_conf_t *pcf; + ngx_tcp_proxy_ctx_t *pctx; + ngx_tcp_core_srv_conf_t *cscf; + ngx_tcp_upstream_srv_conf_t *uscf; c = ev->data; s = c->data; @@ -389,6 +400,9 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) "tcp proxy handler: %d, #%d > #%d, time:%ui", do_write, src->fd, dst->fd, ngx_current_msec); + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_proxy_module); + uscf = pcf->upstream.upstream; + for ( ;; ) { if (do_write) { @@ -398,13 +412,28 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) if (size && dst->write->ready) { c->log->action = send_action; - /* TODO: move to somewhere */ - if (ngx_strncmp("quit", b->pos, 4) == 0) { - ngx_log_debug0(NGX_LOG_DEBUG_TCP, ev->log, 0, - "received quit, close session"); + if (uscf->server_type.data) { + for (i = 0; ngx_tcp_server_types[i].key.data; i++) { + if (ngx_strncmp(uscf->server_type.data, + ngx_tcp_server_types[i].key.data, + ngx_min(uscf->server_type.len, + ngx_tcp_server_types[i].key.len)) == 0) { + break; + } - ngx_tcp_finalize_session(s); - return; + } + if (ngx_tcp_server_types[i].value.data + && ngx_strncasecmp(b->pos, + ngx_tcp_server_types[i].value.data, + ngx_tcp_server_types[i].value.len) == 0) { + ngx_log_debug0(NGX_LOG_DEBUG_TCP, ev->log, 0, + "received quit, close session"); + + ngx_tcp_finalize_session(s); + return; + } + } else { + s->upstream->keepalive = 0; } n = dst->send(dst, b->pos, size); @@ -510,8 +539,6 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) return; } - pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_proxy_module); - if (c == s->connection) { ngx_add_timer(c->read, cscf->timeout); } diff --git a/ngx_tcp_upstream.c b/ngx_tcp_upstream.c index c9bc1e8..94d93da 100644 --- a/ngx_tcp_upstream.c +++ b/ngx_tcp_upstream.c @@ -98,6 +98,13 @@ static ngx_command_t ngx_tcp_upstream_commands[] = { offsetof(ngx_tcp_upstream_main_conf_t, check_shm_size), NULL }, + { ngx_string("server_type"), + NGX_TCP_UPS_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_upstream_srv_conf_t, server_type), + NULL }, + ngx_null_command }; @@ -690,6 +697,7 @@ ngx_tcp_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags) uscf->no_port = u->no_port; #endif uscf->code.status_alive = 0; + uscf->server_type = (ngx_str_t) ngx_null_string; if (u->naddrs == 1) { uscf->servers = ngx_array_create(cf->pool, 1, diff --git a/ngx_tcp_upstream.h b/ngx_tcp_upstream.h index f9a832b..7188b4d 100644 --- a/ngx_tcp_upstream.h +++ b/ngx_tcp_upstream.h @@ -120,6 +120,8 @@ struct ngx_tcp_upstream_srv_conf_s { ngx_uint_t return_code; ngx_uint_t status_alive; } code; + + ngx_str_t server_type; }; From cb9000688f3ed8dffcddddea8a52a13dd2fe33a1 Mon Sep 17 00:00:00 2001 From: ideal Date: Fri, 1 Nov 2013 16:51:29 +0800 Subject: [PATCH 15/34] add document about ngx_tcp_upstream_keepalive_module --- README | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/README b/README index 3b5a577..791ec98 100644 --- a/README +++ b/README @@ -242,6 +242,30 @@ Directives (). Default port is 80. + server_type + syntax: *server_type parameter* + + default: *none* + + context: *upstream* + + description: Specify the server type, currently only "redis" is + supported. + + keepalive + syntax: *keepalive num [single]* + + default: *none* + + context: *upstream* + + description: Enables keep-alive connections for the upstream. Num + specifies the max number of connections to keep open before, if + the max is reached it will close the least recently used connections. + + Single treats everything as a single host. With this flag connections + to different backends are treated as equal. + check syntax: *check interval=milliseconds [fall=count] [rise=count] [timeout=milliseconds] [type=tcp|ssl_hello|smtp|mysql|pop3|imap]* From b66dd187c76858fe5209cb8d130548cee7a10b59 Mon Sep 17 00:00:00 2001 From: ideal Date: Fri, 15 Nov 2013 17:33:40 +0800 Subject: [PATCH 16/34] add monitor server module --- config | 4 +- modules/ngx_tcp_monitor_server_module.c | 589 ++++++++++++++++++++++++ 2 files changed, 591 insertions(+), 2 deletions(-) create mode 100644 modules/ngx_tcp_monitor_server_module.c diff --git a/config b/config index 2ac7136..977e576 100644 --- a/config +++ b/config @@ -4,7 +4,7 @@ ngx_feature_run=no ngx_feature_incs= ngx_feature_path="$ngx_addon_dir/modules $ngx_addon_dir/parsers $ngx_addon_dir" ngx_feature_deps="$ngx_addon_dir/ngx_tcp.h $ngx_addon_dir/ngx_tcp_session.h $ngx_addon_dir/ngx_tcp_upstream.h $ngx_addon_dir/ngx_tcp_upstream_check.h $ngx_addon_dir/ngx_tcp_upstream_round_robin.h $ngx_addon_dir/ngx_tcp_upstream_keepalive.h" -ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c $ngx_addon_dir/ngx_tcp_upstream_keepalive.c" +ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c $ngx_addon_dir/ngx_tcp_upstream_keepalive.c $ngx_addon_dir/modules/ngx_tcp_monitor_server_module.c" ngx_tcp_ssl_deps="$ngx_addon_dir/modules/ngx_tcp_ssl_module.h" ngx_tcp_ssl_src="$ngx_addon_dir/modules/ngx_tcp_ssl_module.c" ngx_tcp_parser_deps="$ngx_addon_dir/parsers/parser.h $ngx_addon_dir/parsers/http_request_parser.h $ngx_addon_dir/parsers/http_response_parser.h $ngx_addon_dir/parsers/smtp_response_parser.h" @@ -17,7 +17,7 @@ if [ $ngx_found = yes ]; then ngx_addon_name=ngx_tcp_module TCP_CORE_MODULES="ngx_tcp_module ngx_tcp_core_module ngx_tcp_upstream_module" - TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module ngx_tcp_upstream_keepalive_module" + TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module ngx_tcp_upstream_keepalive_module ngx_tcp_monitor_module" NGX_ADDON_DEPS="$NGX_ADDON_DEPS $ngx_feature_deps $ngx_tcp_parser_deps" NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_tcp_src $ngx_tcp_parser_src" diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c new file mode 100644 index 0000000..5563e77 --- /dev/null +++ b/modules/ngx_tcp_monitor_server_module.c @@ -0,0 +1,589 @@ +/* + * Copyright (C) 2013 Shang Yuanchun + * + */ + +#include +#include +#include + + +typedef struct ngx_tcp_monitor_s { + ngx_peer_connection_t *upstream; + ngx_buf_t *buffer; +} ngx_tcp_monitor_ctx_t; + + +typedef struct ngx_tcp_monitor_conf_s { + ngx_tcp_upstream_conf_t upstream; + + ngx_str_t url; + size_t buffer_size; +} ngx_tcp_monitor_conf_t; + + +static void ngx_tcp_monitor_init_session(ngx_tcp_session_t *s); +static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, + ngx_tcp_session_t *s); +static void ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, + ngx_tcp_upstream_t *u); +static char *ngx_tcp_monitor_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); +static void ngx_tcp_monitor_dummy_read_handler(ngx_event_t *ev); +static void ngx_tcp_monitor_dummy_write_handler(ngx_event_t *ev); +static void ngx_tcp_monitor_handler(ngx_event_t *ev); +static void *ngx_tcp_monitor_create_conf(ngx_conf_t *cf); +static char *ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, + void *child); + +static ngx_tcp_protocol_t ngx_tcp_generic_protocol = { + + ngx_string("monitor_server"), + { 0, 0, 0, 0 }, + NGX_TCP_GENERIC_PROTOCOL, + ngx_tcp_monitor_init_session, + NULL, + NULL, + ngx_string("500 Internal server error" CRLF) + +}; + + +static ngx_command_t ngx_tcp_monitor_commands[] = { + + { ngx_string("monitor_pass"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_tcp_monitor_pass, + NGX_TCP_SRV_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("monitor_buffer"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, buffer_size), + NULL }, + + { ngx_string("monitor_connect_timeout"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, upstream.connect_timeout), + NULL }, + + { ngx_string("monitor_read_timeout"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, upstream.read_timeout), + NULL }, + + { ngx_string("monitor_send_timeout"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, upstream.send_timeout), + NULL }, + + ngx_null_command +}; + + +static ngx_tcp_module_t ngx_tcp_monitor_module_ctx = { + &ngx_tcp_generic_protocol, /* protocol */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_tcp_monitor_create_conf, /* create server configuration */ + ngx_tcp_monitor_merge_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_tcp_monitor_module = { + NGX_MODULE_V1, + &ngx_tcp_monitor_module_ctx, /* module context */ + ngx_tcp_monitor_commands, /* module directives */ + NGX_TCP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void +ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) +{ + ngx_connection_t *c; + ngx_tcp_monitor_conf_t *pcf; + ngx_tcp_core_srv_conf_t *cscf; + + c = s->connection; + + ngx_log_debug0(NGX_LOG_DEBUG_TCP, c->log, 0, "tcp monitor init session"); + + cscf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_core_module); + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + + s->buffer = ngx_create_temp_buf(s->connection->pool, pcf->buffer_size); + if (s->buffer == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + s->out.len = 0; + + c->write->handler = ngx_tcp_monitor_dummy_write_handler; + c->read->handler = ngx_tcp_monitor_dummy_read_handler; + + ngx_add_timer(c->read, cscf->timeout); + + ngx_tcp_monitor_init_upstream(c, s); + + return; +} + + +static void +ngx_tcp_monitor_dummy_write_handler(ngx_event_t *wev) +{ + ngx_connection_t *c; + ngx_tcp_session_t *s; + + c = wev->data; + s = c->data; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, + "tcp monitor dummy write handler: %d", c->fd); + + if (ngx_handle_write_event(wev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + } +} + + +static void +ngx_tcp_monitor_dummy_read_handler(ngx_event_t *rev) +{ + ngx_connection_t *c; + ngx_tcp_session_t *s; + + c = rev->data; + s = c->data; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor dummy read handler: %d", c->fd); + + if (ngx_handle_read_event(rev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + } +} + + +static void +ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) +{ + ngx_tcp_upstream_t *u; + ngx_tcp_monitor_ctx_t *p; + ngx_tcp_monitor_conf_t *pcf; + + s->connection->log->action = "ngx_tcp_monitor_init_upstream"; + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + if (pcf->upstream.upstream == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + p = ngx_pcalloc(s->connection->pool, sizeof(ngx_tcp_monitor_ctx_t)); + if (p == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + ngx_tcp_set_ctx(s, p, ngx_tcp_monitor_module); + + if (ngx_tcp_upstream_create(s) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + u = s->upstream; + + u->conf = &pcf->upstream; + + u->write_event_handler = ngx_tcp_upstream_init_monitor_handler; + u->read_event_handler = ngx_tcp_upstream_init_monitor_handler; + + p->upstream = &u->peer; + + p->buffer = ngx_create_temp_buf(s->connection->pool, pcf->buffer_size); + if (p->buffer == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + ngx_tcp_upstream_init(s); + + return; +} + + +static void +ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u) +{ + ngx_connection_t *c; + ngx_tcp_monitor_ctx_t *pctx; + ngx_tcp_monitor_conf_t *pcf; + + c = s->connection; + c->log->action = "ngx_tcp_upstream_init_monitor_handler"; + + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor upstream init monitor"); + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + + if (pcf == NULL || pctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + pctx->upstream = &s->upstream->peer; + + c = pctx->upstream->connection; + if (c->read->timedout || c->write->timedout) { + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_TIMEOUT); + return; + } + + if (ngx_tcp_upstream_check_broken_connection(s) != NGX_OK){ + ngx_tcp_upstream_next(s, u, NGX_TCP_UPSTREAM_FT_ERROR); + return; + } + + s->connection->read->handler = ngx_tcp_monitor_handler; + s->connection->write->handler = ngx_tcp_monitor_handler; + + c->read->handler = ngx_tcp_monitor_handler; + c->write->handler = ngx_tcp_monitor_handler; + + ngx_add_timer(c->read, pcf->upstream.read_timeout); + ngx_add_timer(c->write, pcf->upstream.send_timeout); + + if (ngx_handle_read_event(s->connection->read, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + return; +} + + +static void +ngx_tcp_monitor_handler(ngx_event_t *ev) +{ + char *action, *recv_action, *send_action; + off_t *read_bytes, *write_bytes; + size_t size; + ssize_t n; + ngx_buf_t *b; + ngx_err_t err; + ngx_uint_t do_write; + ngx_connection_t *c, *src, *dst; + ngx_tcp_session_t *s; + ngx_tcp_monitor_conf_t *pcf; + ngx_tcp_monitor_ctx_t *pctx; + ngx_tcp_core_srv_conf_t *cscf; + + c = ev->data; + s = c->data; + + cscf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_core_module); + + if (ev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + + if (pctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + read_bytes = NULL; + write_bytes = NULL; + + if (c == s->connection) { + if (ev->write) { + recv_action = "client write: monitoring and reading from upstream"; + send_action = "client write: monitoring and sending to client"; + src = pctx->upstream->connection; + dst = c; + b = pctx->buffer; + write_bytes = &s->bytes_write; + } else { + recv_action = "client read: monitoring and reading from client"; + send_action = "client read: monitoring and sending to upstream"; + src = c; + dst = pctx->upstream->connection; + b = s->buffer; + read_bytes = &s->bytes_read; + } + + } else { + if (ev->write) { + recv_action = "upstream write: monitoring and reading from client"; + send_action = "upstream write: monitoring and sending to upstream"; + src = s->connection; + dst = c; + b = s->buffer; + read_bytes = &s->bytes_read; + } else { + recv_action = "upstream read: monitoring and reading from upstream"; + send_action = "upstream read: monitoring and sending to client"; + src = c; + dst = s->connection; + b = pctx->buffer; + write_bytes = &s->bytes_write; + } + } + + do_write = ev->write ? 1 : 0; + + ngx_log_debug4(NGX_LOG_DEBUG_TCP, ev->log, 0, + "tcp monitor handler: %d, #%d > #%d, time:%ui", + do_write, src->fd, dst->fd, ngx_current_msec); + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + + for ( ;; ) { + + if (do_write) { + + size = b->last - b->pos; + + if (size && dst->write->ready) { + c->log->action = send_action; + + n = dst->send(dst, b->pos, size); + err = ngx_socket_errno; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, ev->log, 0, + "tcp monitor handler send:%d", n); + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "monitor send error"); + + ngx_tcp_finalize_session(s); + return; + } + + if (n > 0) { + b->pos += n; + + if (write_bytes) { + *write_bytes += n; + } + + if (b->pos == b->last) { + b->pos = b->start; + b->last = b->start; + } + } + } + } + + size = b->end - b->last; + + if (size) { + if (src->read->ready) { + + c->log->action = recv_action; + + n = src->recv(src, b->last, size); + err = ngx_socket_errno; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, ev->log, 0, + "tcp monitor handler recv:%d", n); + + if (n == NGX_AGAIN || n == 0) { + break; + } + + if (n > 0) { + do_write = 1; + b->last += n; + + if (read_bytes) { + *read_bytes += n; + } + + continue; + } + + if (n == NGX_ERROR) { + src->read->eof = 1; + } + } + } + + break; + } + + c->log->action = "nginx tcp monitoring"; + + if ((s->connection->read->eof && s->buffer->pos == s->buffer->last) + || (pctx->upstream->connection->read->eof + && pctx->buffer->pos == pctx->buffer->last) + || (s->connection->read->eof + && pctx->upstream->connection->read->eof)) + { + action = c->log->action; + c->log->action = NULL; + ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "monitor session done"); + c->log->action = action; + + ngx_tcp_finalize_session(s); + return; + } + + if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + if (ngx_handle_read_event(dst->read, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + if (ngx_handle_write_event(src->write, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + if (ngx_handle_read_event(src->read, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + + if (c == s->connection) { + ngx_add_timer(c->read, cscf->timeout); + } + + if (c == pctx->upstream->connection) { + if (ev->write) { + ngx_add_timer(c->write, pcf->upstream.send_timeout); + } else { + ngx_add_timer(c->read, pcf->upstream.read_timeout); + } + } + + return; +} + + +static char * +ngx_tcp_monitor_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_tcp_monitor_conf_t *pcf = conf; + + u_short port = 80; + ngx_str_t *value, *url = &pcf->url; + ngx_url_t u; + ngx_tcp_core_srv_conf_t *cscf; + + cscf = ngx_tcp_conf_get_module_srv_conf(cf, ngx_tcp_core_module); + + if (cscf->protocol && ngx_strncmp(cscf->protocol->name.data, + (u_char *)"tcp_generic", + sizeof("tcp_generic") - 1) != 0) { + + return "the protocol should be tcp_generic"; + } + + if (cscf->protocol == NULL) { + cscf->protocol = &ngx_tcp_generic_protocol; + } + + if (pcf->upstream.upstream) { + return "is duplicate"; + } + + value = cf->args->elts; + + url = &value[1]; + + ngx_memzero(&u, sizeof(u)); + + u.url.len = url->len; + u.url.data = url->data; + u.default_port = port; + u.uri_part = 1; + u.no_resolve = 1; + + pcf->upstream.upstream = ngx_tcp_upstream_add(cf, &u, 0); + if (pcf->upstream.upstream == NULL) { + return NGX_CONF_ERROR; + } + + return NGX_CONF_OK; +} + + +static void * +ngx_tcp_monitor_create_conf(ngx_conf_t *cf) +{ + ngx_tcp_monitor_conf_t *pcf; + + pcf = ngx_pcalloc(cf->pool, sizeof(ngx_tcp_monitor_conf_t)); + if (pcf == NULL) { + return NULL; + } + + pcf->buffer_size = NGX_CONF_UNSET_SIZE; + + pcf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC; + pcf->upstream.send_timeout = NGX_CONF_UNSET_MSEC; + pcf->upstream.read_timeout = NGX_CONF_UNSET_MSEC; + + return pcf; +} + + +static char * +ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_tcp_monitor_conf_t *prev = parent; + ngx_tcp_monitor_conf_t *conf = child; + + ngx_conf_merge_size_value(conf->buffer_size, prev->buffer_size, + (size_t) ngx_pagesize); + + ngx_conf_merge_msec_value(conf->upstream.connect_timeout, + prev->upstream.connect_timeout, 60000); + + ngx_conf_merge_msec_value(conf->upstream.send_timeout, + prev->upstream.send_timeout, 60000); + + ngx_conf_merge_msec_value(conf->upstream.read_timeout, + prev->upstream.read_timeout, 60000); + + return NGX_CONF_OK; +} From f9c0bcdec71c15c88786edaf0f04b452d656d277 Mon Sep 17 00:00:00 2001 From: ideal Date: Mon, 18 Nov 2013 15:20:41 +0800 Subject: [PATCH 17/34] add header definition --- modules/ngx_tcp_monitor_server_module.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 5563e77..6504778 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -7,6 +7,27 @@ #include #include +/* + * header: |---- 4 ----|-- 2 --|-- 2 --| + * length type padding + * + */ +typedef struct ngx_tcp_monitor_header_s { + uint32_t length; + uint16_t type; + uint16_t spare0; +} ngx_tcp_monitor_header_t; + +#define HEADER_LENGTH sizeof(ngx_tcp_monitor_header_t) + +#define monitor_packet_size(ptr) (*(u_char *)(ptr) + \ + (*((u_char *)(ptr) + 1) << 8) + \ + (*((u_char *)(ptr) + 2) << 16) + \ + (*((u_char *)(ptr) + 3) << 24) ) + +#define MONITOR_TYPE_OFFSET offsetof(ngx_tcp_monitor_header_t, type) +#define monitor_type(ptr) (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET) + \ + (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET + 1) << 8) ) typedef struct ngx_tcp_monitor_s { ngx_peer_connection_t *upstream; From 87f4cb6d2e9fa7933c61407e94448d8bd4694549 Mon Sep 17 00:00:00 2001 From: ideal Date: Fri, 22 Nov 2013 17:18:39 +0800 Subject: [PATCH 18/34] update handler --- modules/ngx_tcp_monitor_server_module.c | 90 ++++++++++++++++++++----- 1 file changed, 74 insertions(+), 16 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 6504778..a44d1b1 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -11,6 +11,8 @@ * header: |---- 4 ----|-- 2 --|-- 2 --| * length type padding * + * all are little endian + * */ typedef struct ngx_tcp_monitor_header_s { uint32_t length; @@ -49,9 +51,13 @@ static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, static void ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u); static char *ngx_tcp_monitor_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); -static void ngx_tcp_monitor_dummy_read_handler(ngx_event_t *ev); -static void ngx_tcp_monitor_dummy_write_handler(ngx_event_t *ev); +static void ngx_tcp_monitor_client_read_handler(ngx_event_t *rev); +static void ngx_tcp_monitor_client_write_handler(ngx_event_t *wev); +static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev); +static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev); +#if 0 static void ngx_tcp_monitor_handler(ngx_event_t *ev); +#endif static void *ngx_tcp_monitor_create_conf(ngx_conf_t *cf); static char *ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, void *child); @@ -160,19 +166,43 @@ ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) s->out.len = 0; - c->write->handler = ngx_tcp_monitor_dummy_write_handler; - c->read->handler = ngx_tcp_monitor_dummy_read_handler; + c->write->handler = ngx_tcp_monitor_client_write_handler; + c->read->handler = ngx_tcp_monitor_client_read_handler; ngx_add_timer(c->read, cscf->timeout); - ngx_tcp_monitor_init_upstream(c, s); + // We will call this after we receive data completely + // ngx_tcp_monitor_init_upstream(c, s); + + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } return; } static void -ngx_tcp_monitor_dummy_write_handler(ngx_event_t *wev) +ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) +{ + ngx_connection_t *c; + ngx_tcp_session_t *s; + + c = rev->data; + s = c->data; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor client read handler: %d", c->fd); + + if (ngx_handle_read_event(rev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + } +} + + +static void +ngx_tcp_monitor_client_write_handler(ngx_event_t *wev) { ngx_connection_t *c; ngx_tcp_session_t *s; @@ -181,7 +211,7 @@ ngx_tcp_monitor_dummy_write_handler(ngx_event_t *wev) s = c->data; ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, - "tcp monitor dummy write handler: %d", c->fd); + "tcp monitor client write handler: %d", c->fd); if (ngx_handle_write_event(wev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); @@ -189,8 +219,7 @@ ngx_tcp_monitor_dummy_write_handler(ngx_event_t *wev) } -static void -ngx_tcp_monitor_dummy_read_handler(ngx_event_t *rev) +static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev) { ngx_connection_t *c; ngx_tcp_session_t *s; @@ -199,11 +228,31 @@ ngx_tcp_monitor_dummy_read_handler(ngx_event_t *rev) s = c->data; ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, - "tcp monitor dummy read handler: %d", c->fd); + "tcp monitor upstream read handler: %d", c->fd); if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); } + + // TODO: add condition + ngx_tcp_monitor_init_upstream(c, s); +} + + +static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) +{ + ngx_connection_t *c; + ngx_tcp_session_t *s; + + c = wev->data; + s = c->data; + + ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, + "tcp monitor upstream read handler: %d", c->fd); + + if (ngx_handle_write_event(wev, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + } } @@ -291,16 +340,13 @@ ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t * return; } - s->connection->read->handler = ngx_tcp_monitor_handler; - s->connection->write->handler = ngx_tcp_monitor_handler; - - c->read->handler = ngx_tcp_monitor_handler; - c->write->handler = ngx_tcp_monitor_handler; + c->read->handler = ngx_tcp_monitor_upstream_read_handler; + c->write->handler = ngx_tcp_monitor_upstream_write_handler; ngx_add_timer(c->read, pcf->upstream.read_timeout); ngx_add_timer(c->write, pcf->upstream.send_timeout); - if (ngx_handle_read_event(s->connection->read, 0) != NGX_OK) { + if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_tcp_finalize_session(s); return; } @@ -309,6 +355,7 @@ ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t * } +#if 0 static void ngx_tcp_monitor_handler(ngx_event_t *ev) { @@ -385,6 +432,16 @@ ngx_tcp_monitor_handler(ngx_event_t *ev) } } + // FIXME: this is debug code + int packet_size = 0; + if (c == s->connection && src == c) { + printf("packet_size: %d\n", packet_size); + if (*read_bytes >= (off_t)HEADER_LENGTH) { + packet_size = monitor_packet_size(b->pos); + printf("packet_size: %d\n", packet_size); + } + } + do_write = ev->write ? 1 : 0; ngx_log_debug4(NGX_LOG_DEBUG_TCP, ev->log, 0, @@ -518,6 +575,7 @@ ngx_tcp_monitor_handler(ngx_event_t *ev) return; } +#endif static char * From a22dc02a860af326a62139d06f8e5b216042d930 Mon Sep 17 00:00:00 2001 From: ideal Date: Mon, 25 Nov 2013 15:48:16 +0800 Subject: [PATCH 19/34] add header and request_body for each connection --- modules/ngx_tcp_monitor_server_module.c | 50 ++++++++++--------------- 1 file changed, 19 insertions(+), 31 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index a44d1b1..25f6f26 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -33,15 +33,16 @@ typedef struct ngx_tcp_monitor_header_s { typedef struct ngx_tcp_monitor_s { ngx_peer_connection_t *upstream; - ngx_buf_t *buffer; + ngx_buf_t *request_body; + + ngx_buf_t *header_out; + ngx_buf_t *reponse_body; } ngx_tcp_monitor_ctx_t; typedef struct ngx_tcp_monitor_conf_s { ngx_tcp_upstream_conf_t upstream; - ngx_str_t url; - size_t buffer_size; } ngx_tcp_monitor_conf_t; @@ -84,13 +85,6 @@ static ngx_command_t ngx_tcp_monitor_commands[] = { 0, NULL }, - { ngx_string("monitor_buffer"), - NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, - ngx_conf_set_size_slot, - NGX_TCP_SRV_CONF_OFFSET, - offsetof(ngx_tcp_monitor_conf_t, buffer_size), - NULL }, - { ngx_string("monitor_connect_timeout"), NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_msec_slot, @@ -147,8 +141,8 @@ static void ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) { ngx_connection_t *c; - ngx_tcp_monitor_conf_t *pcf; ngx_tcp_core_srv_conf_t *cscf; + ngx_tcp_monitor_ctx_t *ctx; c = s->connection; @@ -156,9 +150,7 @@ ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) cscf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_core_module); - pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); - - s->buffer = ngx_create_temp_buf(s->connection->pool, pcf->buffer_size); + s->buffer = ngx_create_temp_buf(s->connection->pool, HEADER_LENGTH); if (s->buffer == NULL) { ngx_tcp_finalize_session(s); return; @@ -171,6 +163,14 @@ ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) ngx_add_timer(c->read, cscf->timeout); + ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_tcp_monitor_ctx_t)); + if (ctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + ngx_tcp_set_ctx(s, ctx, ngx_tcp_monitor_module); + // We will call this after we receive data completely // ngx_tcp_monitor_init_upstream(c, s); @@ -260,8 +260,8 @@ static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) { ngx_tcp_upstream_t *u; - ngx_tcp_monitor_ctx_t *p; - ngx_tcp_monitor_conf_t *pcf; + ngx_tcp_monitor_ctx_t *p; + ngx_tcp_monitor_conf_t *pcf; s->connection->log->action = "ngx_tcp_monitor_init_upstream"; @@ -271,14 +271,6 @@ ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) return; } - p = ngx_pcalloc(s->connection->pool, sizeof(ngx_tcp_monitor_ctx_t)); - if (p == NULL) { - ngx_tcp_finalize_session(s); - return; - } - - ngx_tcp_set_ctx(s, p, ngx_tcp_monitor_module); - if (ngx_tcp_upstream_create(s) != NGX_OK) { ngx_tcp_finalize_session(s); return; @@ -291,10 +283,11 @@ ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) u->write_event_handler = ngx_tcp_upstream_init_monitor_handler; u->read_event_handler = ngx_tcp_upstream_init_monitor_handler; + p = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); p->upstream = &u->peer; - p->buffer = ngx_create_temp_buf(s->connection->pool, pcf->buffer_size); - if (p->buffer == NULL) { + p->header_out = ngx_create_temp_buf(s->connection->pool, HEADER_LENGTH); + if (p->header_out == NULL) { ngx_tcp_finalize_session(s); return; } @@ -636,8 +629,6 @@ ngx_tcp_monitor_create_conf(ngx_conf_t *cf) return NULL; } - pcf->buffer_size = NGX_CONF_UNSET_SIZE; - pcf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC; pcf->upstream.send_timeout = NGX_CONF_UNSET_MSEC; pcf->upstream.read_timeout = NGX_CONF_UNSET_MSEC; @@ -652,9 +643,6 @@ ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, void *child) ngx_tcp_monitor_conf_t *prev = parent; ngx_tcp_monitor_conf_t *conf = child; - ngx_conf_merge_size_value(conf->buffer_size, prev->buffer_size, - (size_t) ngx_pagesize); - ngx_conf_merge_msec_value(conf->upstream.connect_timeout, prev->upstream.connect_timeout, 60000); From cc8cdfb32e638e9099d7d949535cd0070e7399e9 Mon Sep 17 00:00:00 2001 From: ideal Date: Mon, 25 Nov 2013 17:06:21 +0800 Subject: [PATCH 20/34] read client data --- modules/ngx_tcp_monitor_server_module.c | 79 +++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 25f6f26..b90a0c7 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -33,7 +33,10 @@ typedef struct ngx_tcp_monitor_header_s { typedef struct ngx_tcp_monitor_s { ngx_peer_connection_t *upstream; + // ngx_tcp_session_t's buffer is header_in + // request_body is the request body ngx_buf_t *request_body; + ngx_uint_t request_len; ngx_buf_t *header_out; ngx_buf_t *reponse_body; @@ -186,8 +189,12 @@ ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) static void ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) { - ngx_connection_t *c; - ngx_tcp_session_t *s; + ssize_t n, size; + ngx_err_t err; + ngx_buf_t *b; + ngx_connection_t *c; + ngx_tcp_session_t *s; + ngx_tcp_monitor_ctx_t *pctx; c = rev->data; s = c->data; @@ -195,6 +202,72 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, "tcp monitor client read handler: %d", c->fd); + if (rev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + if (pctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + + for ( ;; ) { + if (c->read->ready) { + + c->log->action = "client read: reading from client"; + if (s->bytes_read < (off_t)HEADER_LENGTH) { + size = HEADER_LENGTH - s->bytes_read; + b = s->buffer; + } else { + if (pctx->request_body == NULL) { + pctx->request_len = monitor_packet_size(s->buffer->start); + pctx->request_body = ngx_create_temp_buf(c->pool, + pctx->request_len); + } + size = pctx->request_len - s->bytes_read + HEADER_LENGTH; + b = pctx->request_body; + } + n = c->recv(c, b->last, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "client read error"); + } + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor handler recv:%d", n); + + if (n == NGX_AGAIN || n == 0) { + break; + } + + if (n > 0) { + b->last += n; + s->bytes_read += n; + continue; + } + + if (n == NGX_ERROR) { + c->read->eof = 1; + } + } + + break; + } + + if ((s->connection->read->eof && s->bytes_read == (off_t)(pctx->request_len + HEADER_LENGTH))) + { + ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "read client data done"); + ngx_tcp_monitor_init_upstream(c, s); + return; + } + if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); } @@ -234,8 +307,6 @@ static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev) ngx_tcp_finalize_session(s); } - // TODO: add condition - ngx_tcp_monitor_init_upstream(c, s); } From b32195753ae4c07019800cfcc7c07c61e5df1017 Mon Sep 17 00:00:00 2001 From: ideal Date: Mon, 25 Nov 2013 19:16:18 +0800 Subject: [PATCH 21/34] read client data --- modules/ngx_tcp_monitor_server_module.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index b90a0c7..0f36686 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -186,6 +186,11 @@ ngx_tcp_monitor_init_session(ngx_tcp_session_t *s) } +/* + * FIXME: 1. I am not sure below will block! + * 2. Server did not close connection currently! + * + */ static void ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) { From 2484a6dcfe3ff1a178ded17a97b0632f315b6621 Mon Sep 17 00:00:00 2001 From: ideal Date: Tue, 26 Nov 2013 12:09:15 +0800 Subject: [PATCH 22/34] finalize session if client data is not correct --- modules/ngx_tcp_monitor_server_module.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 0f36686..fea1ed9 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -239,11 +239,16 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) size = pctx->request_len - s->bytes_read + HEADER_LENGTH; b = pctx->request_body; } + if (size < 0) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, + "client data not correct, handler: %d", c->fd); + } n = c->recv(c, b->last, size); err = ngx_socket_errno; if (n == NGX_ERROR) { ngx_log_error(NGX_LOG_ERR, c->log, err, "client read error"); + ngx_tcp_finalize_session(s); } ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, "tcp monitor handler recv:%d", n); @@ -324,7 +329,7 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) s = c->data; ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, - "tcp monitor upstream read handler: %d", c->fd); + "tcp monitor upstream write handler: %d", c->fd); if (ngx_handle_write_event(wev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); From ef41976ab881961d85f0aa1287ffd829e61d14bd Mon Sep 17 00:00:00 2001 From: ideal Date: Wed, 27 Nov 2013 15:29:58 +0800 Subject: [PATCH 23/34] add query for redis --- modules/ngx_tcp_monitor_server_module.c | 96 ++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 9 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index fea1ed9..f20c816 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -28,11 +28,17 @@ typedef struct ngx_tcp_monitor_header_s { (*((u_char *)(ptr) + 3) << 24) ) #define MONITOR_TYPE_OFFSET offsetof(ngx_tcp_monitor_header_t, type) -#define monitor_type(ptr) (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET) + \ +#define monitor_packet_type(ptr) (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET) + \ (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET + 1) << 8) ) -typedef struct ngx_tcp_monitor_s { +#define PACKET_TYPE_JSON 1 +#define PACKET_TYPE_TLV 2 +#define PACKET_TYPE_BSON 3 +#define PACKET_TYPE_MSGPACK 4 + +typedef struct ngx_tcp_monitor_ctx_s { ngx_peer_connection_t *upstream; + // ngx_tcp_session_t's buffer is header_in // request_body is the request body ngx_buf_t *request_body; @@ -40,14 +46,31 @@ typedef struct ngx_tcp_monitor_s { ngx_buf_t *header_out; ngx_buf_t *reponse_body; + + ngx_buf_t *upstream_request_header; + ngx_buf_t *upstream_request_tail; } ngx_tcp_monitor_ctx_t; typedef struct ngx_tcp_monitor_conf_s { ngx_tcp_upstream_conf_t upstream; ngx_str_t url; + ngx_str_t queue_name; } ngx_tcp_monitor_conf_t; +#if 0 +static size_t ngx_get_num_size(ngx_uint_t i) +{ + size_t n = 0; + + do { + i /= 10; + n++; + } while (i > 0); + + return n; +} +#endif static void ngx_tcp_monitor_init_session(ngx_tcp_session_t *s); static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, @@ -59,12 +82,11 @@ static void ngx_tcp_monitor_client_read_handler(ngx_event_t *rev); static void ngx_tcp_monitor_client_write_handler(ngx_event_t *wev); static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev); static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev); -#if 0 -static void ngx_tcp_monitor_handler(ngx_event_t *ev); -#endif static void *ngx_tcp_monitor_create_conf(ngx_conf_t *cf); static char *ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, void *child); +static ngx_int_t ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, + ngx_buf_t **header, ngx_buf_t **tail); static ngx_tcp_protocol_t ngx_tcp_generic_protocol = { @@ -88,6 +110,13 @@ static ngx_command_t ngx_tcp_monitor_commands[] = { 0, NULL }, + { ngx_string("queue_name"), + NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_monitor_conf_t, queue_name), + NULL }, + { ngx_string("monitor_connect_timeout"), NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_msec_slot, @@ -119,8 +148,8 @@ static ngx_tcp_module_t ngx_tcp_monitor_module_ctx = { NULL, /* create main configuration */ NULL, /* init main configuration */ - ngx_tcp_monitor_create_conf, /* create server configuration */ - ngx_tcp_monitor_merge_conf /* merge server configuration */ + ngx_tcp_monitor_create_conf, /* create server configuration */ + ngx_tcp_monitor_merge_conf /* merge server configuration */ }; @@ -195,6 +224,7 @@ static void ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) { ssize_t n, size; + ngx_int_t rc; ngx_err_t err; ngx_buf_t *b; ngx_connection_t *c; @@ -271,9 +301,15 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) break; } - if ((s->connection->read->eof && s->bytes_read == (off_t)(pctx->request_len + HEADER_LENGTH))) + if ((s->connection->read->eof && + s->bytes_read == (off_t)(pctx->request_len + HEADER_LENGTH))) { ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "read client data done"); + rc = ngx_tcp_monitor_build_query(s, &pctx->upstream_request_header, + &pctx->upstream_request_tail); + if (rc != NGX_OK) { + ngx_tcp_finalize_session(s); + } ngx_tcp_monitor_init_upstream(c, s); return; } @@ -316,7 +352,6 @@ static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev) if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); } - } @@ -331,12 +366,55 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, "tcp monitor upstream write handler: %d", c->fd); + for ( ;; ) { + break; + } + if (ngx_handle_write_event(wev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); } } +static ngx_int_t +ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, ngx_buf_t **header, ngx_buf_t **tail) +{ + size_t len; + u_short packet_type; + ngx_tcp_monitor_conf_t *pcf; + + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + packet_type = monitor_packet_type(s->buffer->start); + // FIXME: below is specific to redis protocol + // http://redis.io/topics/protocol + switch(packet_type) { + case PACKET_TYPE_JSON: + len = sizeof("LPUSH ") - 1 + pcf->queue_name.len + 1; // last +1 for space + *header = ngx_create_temp_buf(s->connection->pool, len); + if (*header == NULL) { + return NGX_ERROR; + } + ngx_sprintf((*header)->last, "LPUSH %*s ", + pcf->queue_name.len, + pcf->queue_name.data); + len = sizeof(CRLF) -1; + *tail = ngx_create_temp_buf(s->connection->pool, len); + if (*tail == NULL) { + return NGX_ERROR; + } + ngx_memcpy((*tail)->last, CRLF, sizeof(CRLF) - 1); + break; + + default: + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "invalid monitor packet type: %hu", packet_type); + return NGX_ERROR; + } + + return NGX_OK; +} + + static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) { From 20e480f9a53b25d7cf964af36d3296ccd0c7163c Mon Sep 17 00:00:00 2001 From: ideal Date: Wed, 27 Nov 2013 17:48:48 +0800 Subject: [PATCH 24/34] add query for redis --- modules/ngx_tcp_monitor_server_module.c | 73 ++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index f20c816..19b98d4 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -272,6 +272,8 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) if (size < 0) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "client data not correct, handler: %d", c->fd); + ngx_tcp_finalize_session(s); + return; } n = c->recv(c, b->last, size); err = ngx_socket_errno; @@ -279,6 +281,7 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) if (n == NGX_ERROR) { ngx_log_error(NGX_LOG_ERR, c->log, err, "client read error"); ngx_tcp_finalize_session(s); + return; } ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, "tcp monitor handler recv:%d", n); @@ -309,6 +312,7 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) &pctx->upstream_request_tail); if (rc != NGX_OK) { ngx_tcp_finalize_session(s); + return; } ngx_tcp_monitor_init_upstream(c, s); return; @@ -316,6 +320,7 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); + return; } } @@ -351,27 +356,91 @@ static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev) if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); + return; } } static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) { + ssize_t n, size; ngx_connection_t *c; ngx_tcp_session_t *s; + ngx_uint_t header_length; + ngx_uint_t tail_length; + ngx_tcp_monitor_ctx_t *pctx; + ngx_buf_t *b; + ngx_err_t err; c = wev->data; s = c->data; + if (wev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + if (pctx == NULL) { + ngx_tcp_finalize_session(s); + return; + } + ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, "tcp monitor upstream write handler: %d", c->fd); + header_length = pctx->upstream_request_header->end - + pctx->upstream_request_header->start; + tail_length = pctx->upstream_request_tail->end - + pctx->upstream_request_tail->start; for ( ;; ) { + if (c->write->ready) { + c->log->action = "upstream send: sending to upstream server"; + size = header_length - s->bytes_write; + b = pctx->upstream_request_header; + if (size <= 0) { + size += pctx->request_len; + b = pctx->request_body; + } + if (size <= 0) { + size += tail_length; + b = pctx->upstream_request_tail; + } + if (size <= 0) { + break; + } + n = c->send(c, b->pos, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "monitor upstream send error"); + return; + } + if (n > 0) { + b->pos +=n; + s->bytes_write += n; + } + } break; } + if (s->bytes_write == (off_t)(header_length + pctx->request_len + tail_length)) { + ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "upstream send data done"); + if (ngx_handle_read_event(c->read, 0) != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + return; + } + if (ngx_handle_write_event(wev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); + return; } } @@ -403,15 +472,13 @@ ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, ngx_buf_t **header, ngx_buf_t return NGX_ERROR; } ngx_memcpy((*tail)->last, CRLF, sizeof(CRLF) - 1); - break; + return NGX_OK; default: ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "invalid monitor packet type: %hu", packet_type); return NGX_ERROR; } - - return NGX_OK; } From 13d8d72d3f0492698b38dcffcf8696d90199a9ab Mon Sep 17 00:00:00 2001 From: ideal Date: Wed, 27 Nov 2013 19:25:14 +0800 Subject: [PATCH 25/34] add query for redis --- modules/ngx_tcp_monitor_server_module.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 19b98d4..894f734 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -304,8 +304,7 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) break; } - if ((s->connection->read->eof && - s->bytes_read == (off_t)(pctx->request_len + HEADER_LENGTH))) + if (s->bytes_read == (off_t)(pctx->request_len + HEADER_LENGTH)) { ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "read client data done"); rc = ngx_tcp_monitor_build_query(s, &pctx->upstream_request_header, @@ -371,20 +370,25 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) ngx_tcp_monitor_ctx_t *pctx; ngx_buf_t *b; ngx_err_t err; + ngx_tcp_monitor_conf_t *pcf; c = wev->data; s = c->data; + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor upstream writer handler"); + if (wev->timedout) { c->log->action = "monitoring"; - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor upstream send timed out"); c->timedout = 1; ngx_tcp_finalize_session(s); return; } + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); if (pctx == NULL) { ngx_tcp_finalize_session(s); @@ -431,6 +435,7 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) if (s->bytes_write == (off_t)(header_length + pctx->request_len + tail_length)) { ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "upstream send data done"); + ngx_add_timer(c->read, pcf->upstream.read_timeout); if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_tcp_finalize_session(s); return; @@ -562,7 +567,6 @@ ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t * c->read->handler = ngx_tcp_monitor_upstream_read_handler; c->write->handler = ngx_tcp_monitor_upstream_write_handler; - ngx_add_timer(c->read, pcf->upstream.read_timeout); ngx_add_timer(c->write, pcf->upstream.send_timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { From d8aaac771863f041cf9069033752c6cdffd48f79 Mon Sep 17 00:00:00 2001 From: ideal Date: Wed, 27 Nov 2013 22:03:42 +0800 Subject: [PATCH 26/34] init upstream --- modules/ngx_tcp_monitor_server_module.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 894f734..61aeabd 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -376,7 +376,7 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) s = c->data; ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, - "tcp monitor upstream writer handler"); + "tcp monitor upstream write handler"); if (wev->timedout) { c->log->action = "monitoring"; @@ -512,7 +512,7 @@ ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) u->conf = &pcf->upstream; u->write_event_handler = ngx_tcp_upstream_init_monitor_handler; - u->read_event_handler = ngx_tcp_upstream_init_monitor_handler; + u->read_event_handler = ngx_tcp_upstream_init_monitor_handler; p = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); p->upstream = &u->peer; @@ -574,6 +574,9 @@ ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t * return; } + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor upstream init monitor done"); + return; } From e9061bd22d8369858caa57e619ca83d28e44d3bb Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 28 Nov 2013 16:09:08 +0800 Subject: [PATCH 27/34] add query for redis --- modules/ngx_tcp_monitor_server_module.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 61aeabd..bde4518 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -88,7 +88,7 @@ static char *ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, static ngx_int_t ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, ngx_buf_t **header, ngx_buf_t **tail); -static ngx_tcp_protocol_t ngx_tcp_generic_protocol = { +static ngx_tcp_protocol_t ngx_tcp_monitor_protocol = { ngx_string("monitor_server"), { 0, 0, 0, 0 }, @@ -143,7 +143,7 @@ static ngx_command_t ngx_tcp_monitor_commands[] = { static ngx_tcp_module_t ngx_tcp_monitor_module_ctx = { - &ngx_tcp_generic_protocol, /* protocol */ + &ngx_tcp_monitor_protocol, /* protocol */ NULL, /* create main configuration */ NULL, /* init main configuration */ @@ -428,6 +428,7 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) if (n > 0) { b->pos +=n; s->bytes_write += n; + continue; } } break; @@ -435,6 +436,7 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) if (s->bytes_write == (off_t)(header_length + pctx->request_len + tail_length)) { ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "upstream send data done"); + ngx_del_timer(c->write); ngx_add_timer(c->read, pcf->upstream.read_timeout); if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_tcp_finalize_session(s); @@ -574,6 +576,8 @@ ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t * return; } + c->write->handler(c->write); + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, "tcp monitor upstream init monitor done"); @@ -824,7 +828,7 @@ ngx_tcp_monitor_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) } if (cscf->protocol == NULL) { - cscf->protocol = &ngx_tcp_generic_protocol; + cscf->protocol = &ngx_tcp_monitor_protocol; } if (pcf->upstream.upstream) { From 1ec7f221499acb84bc6dff5bd17bfabd723d38c6 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 28 Nov 2013 17:24:21 +0800 Subject: [PATCH 28/34] change redis query protocol --- modules/ngx_tcp_monitor_server_module.c | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index bde4518..ba6084f 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -58,7 +58,6 @@ typedef struct ngx_tcp_monitor_conf_s { ngx_str_t queue_name; } ngx_tcp_monitor_conf_t; -#if 0 static size_t ngx_get_num_size(ngx_uint_t i) { size_t n = 0; @@ -70,7 +69,6 @@ static size_t ngx_get_num_size(ngx_uint_t i) return n; } -#endif static void ngx_tcp_monitor_init_session(ngx_tcp_session_t *s); static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, @@ -436,7 +434,9 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) if (s->bytes_write == (off_t)(header_length + pctx->request_len + tail_length)) { ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "upstream send data done"); - ngx_del_timer(c->write); + if (c->write->timer_set) { + ngx_del_timer(c->write); + } ngx_add_timer(c->read, pcf->upstream.read_timeout); if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_tcp_finalize_session(s); @@ -458,21 +458,31 @@ ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, ngx_buf_t **header, ngx_buf_t size_t len; u_short packet_type; ngx_tcp_monitor_conf_t *pcf; + ngx_tcp_monitor_ctx_t *pctx; - pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); packet_type = monitor_packet_type(s->buffer->start); // FIXME: below is specific to redis protocol // http://redis.io/topics/protocol switch(packet_type) { case PACKET_TYPE_JSON: - len = sizeof("LPUSH ") - 1 + pcf->queue_name.len + 1; // last +1 for space + len = sizeof("*3" CRLF "$5" CRLF "LPUSH" CRLF "$") -1 + + ngx_get_num_size(pcf->queue_name.len) + + sizeof(CRLF) -1 + pcf->queue_name.len + + sizeof(CRLF "$") -1 + + ngx_get_num_size(pctx->request_len) + + sizeof(CRLF) - 1; *header = ngx_create_temp_buf(s->connection->pool, len); if (*header == NULL) { return NGX_ERROR; } - ngx_sprintf((*header)->last, "LPUSH %*s ", + ngx_sprintf((*header)->last, "*3"CRLF"$5"CRLF"LPUSH"CRLF + "$%d"CRLF"%*s"CRLF"$%d"CRLF, + pcf->queue_name.len, pcf->queue_name.len, - pcf->queue_name.data); + pcf->queue_name.data, + pctx->request_len); len = sizeof(CRLF) -1; *tail = ngx_create_temp_buf(s->connection->pool, len); if (*tail == NULL) { From 9b08725de7d4710134bbba4d59073b522280a4a2 Mon Sep 17 00:00:00 2001 From: ideal Date: Thu, 28 Nov 2013 19:58:24 +0800 Subject: [PATCH 29/34] change ngx_get_num_size() to inline --- modules/ngx_tcp_monitor_server_module.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index ba6084f..68e353d 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -58,7 +58,7 @@ typedef struct ngx_tcp_monitor_conf_s { ngx_str_t queue_name; } ngx_tcp_monitor_conf_t; -static size_t ngx_get_num_size(ngx_uint_t i) +static inline size_t ngx_get_num_size(ngx_uint_t i) { size_t n = 0; @@ -71,7 +71,7 @@ static size_t ngx_get_num_size(ngx_uint_t i) } static void ngx_tcp_monitor_init_session(ngx_tcp_session_t *s); -static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, +static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s); static void ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t *u); From b473b47fb53f23795f2d1390472c7d13045b7ab9 Mon Sep 17 00:00:00 2001 From: ideal Date: Fri, 29 Nov 2013 14:38:20 +0800 Subject: [PATCH 30/34] it works, but need to reconstruct --- modules/ngx_tcp_monitor_server_module.c | 386 ++++++++++-------------- 1 file changed, 160 insertions(+), 226 deletions(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index 68e353d..b79af16 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -31,6 +31,18 @@ typedef struct ngx_tcp_monitor_header_s { #define monitor_packet_type(ptr) (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET) + \ (*((u_char *)(ptr) + MONITOR_TYPE_OFFSET + 1) << 8) ) +#define set_monitor_packet_size(ptr, size) do { \ + *(u_char *)(ptr) = (size) & 0xff; \ + *((u_char *)(ptr) + 1) = ((size) >> 8) & 0xff; \ + *((u_char *)(ptr) + 2) = ((size) >> 16) & 0xff; \ + *((u_char *)(ptr) + 3) = ((size) >> 24) & 0xff; \ + } while(0) + +#define set_monitor_return_code(ptr, code) do { \ + *((u_char *)(ptr) + MONITOR_TYPE_OFFSET) = (code) & 0xff; \ + *((u_char *)(ptr) + MONITOR_TYPE_OFFSET + 1) = ((code) >> 8) & 0xff; \ + } while(0) + #define PACKET_TYPE_JSON 1 #define PACKET_TYPE_TLV 2 #define PACKET_TYPE_BSON 3 @@ -45,10 +57,11 @@ typedef struct ngx_tcp_monitor_ctx_s { ngx_uint_t request_len; ngx_buf_t *header_out; - ngx_buf_t *reponse_body; ngx_buf_t *upstream_request_header; ngx_buf_t *upstream_request_tail; + + ngx_buf_t *upstream_response; } ngx_tcp_monitor_ctx_t; @@ -85,6 +98,7 @@ static char *ngx_tcp_monitor_merge_conf(ngx_conf_t *cf, void *parent, void *child); static ngx_int_t ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, ngx_buf_t **header, ngx_buf_t **tail); +static ngx_int_t ngx_tcp_monitor_build_response(ngx_tcp_session_t *s); static ngx_tcp_protocol_t ngx_tcp_monitor_protocol = { @@ -325,15 +339,60 @@ ngx_tcp_monitor_client_read_handler(ngx_event_t *rev) static void ngx_tcp_monitor_client_write_handler(ngx_event_t *wev) { + ssize_t n, size; ngx_connection_t *c; ngx_tcp_session_t *s; + ngx_tcp_monitor_ctx_t *pctx; + ngx_err_t err; c = wev->data; s = c->data; + ngx_log_debug0(NGX_LOG_DEBUG_TCP, s->connection->log, 0, + "tcp monitor client write handler"); + + if (wev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor client send timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); ngx_log_debug1(NGX_LOG_DEBUG_TCP, wev->log, 0, "tcp monitor client write handler: %d", c->fd); + for ( ;; ) { + if (c->write->ready) { + c->log->action = "client send: sending to client"; + size = pctx->header_out->end - pctx->header_out->pos; + if (size <= 0) { + break; + } + n = c->send(c, pctx->header_out->pos, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "monitor client send error"); + return; + } + if (n > 0) { + pctx->header_out->pos +=n; + continue; + } + } + break; + } + + if (pctx->header_out->pos == pctx->header_out->end) { + ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "client send data done"); + ngx_tcp_finalize_session(s); + return; + } + if (ngx_handle_write_event(wev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); } @@ -342,8 +401,12 @@ ngx_tcp_monitor_client_write_handler(ngx_event_t *wev) static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev) { - ngx_connection_t *c; - ngx_tcp_session_t *s; + ssize_t n, size; + ngx_int_t rc; + ngx_err_t err; + ngx_connection_t *c; + ngx_tcp_session_t *s; + ngx_tcp_monitor_ctx_t *pctx; c = rev->data; s = c->data; @@ -351,6 +414,70 @@ static void ngx_tcp_monitor_upstream_read_handler(ngx_event_t *rev) ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, "tcp monitor upstream read handler: %d", c->fd); + if (rev->timedout) { + c->log->action = "monitoring"; + + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); + c->timedout = 1; + + ngx_tcp_finalize_session(s); + return; + } + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + if (pctx->upstream_response == NULL) { + pctx->upstream_response = ngx_create_temp_buf(c->pool, ngx_pagesize); + if (pctx->upstream_response == NULL) { + ngx_tcp_finalize_session(s); + return; + } + s->bytes_read = 0; + } + + for ( ;; ) { + if (c->read->ready) { + c->log->action = "upstream read: reading from upstream"; + size = (pctx->upstream_response->end + - pctx->upstream_response->start) - s->bytes_read; + n = c->recv(c, pctx->upstream_response->last, size); + err = ngx_socket_errno; + + if (n == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, c->log, err, "upstream read error"); + ngx_tcp_finalize_session(s); + return; + } + ngx_log_debug1(NGX_LOG_DEBUG_TCP, rev->log, 0, + "tcp monitor handler recv:%d", n); + + if (n == NGX_AGAIN || n == 0) { + break; + } + + if (n > 0) { + pctx->upstream_response->last += n; + s->bytes_read += n; + continue; + } + + if (n == NGX_ERROR) { + c->read->eof = 1; + } + } + break; + } + + if (c->read->eof || + (*(pctx->upstream_response->last - 1) == LF)) { + rc = ngx_tcp_monitor_build_response(s); + if (rc != NGX_OK) { + ngx_tcp_finalize_session(s); + return; + } + s->connection->write->handler(s->connection->write); + return; + } + if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_tcp_finalize_session(s); return; @@ -400,6 +527,10 @@ static void ngx_tcp_monitor_upstream_write_handler(ngx_event_t *wev) pctx->upstream_request_header->start; tail_length = pctx->upstream_request_tail->end - pctx->upstream_request_tail->start; + if (s->bytes_write == (off_t)(header_length + pctx->request_len + tail_length)) { + return; + } + for ( ;; ) { if (c->write->ready) { c->log->action = "upstream send: sending to upstream server"; @@ -499,6 +630,32 @@ ngx_tcp_monitor_build_query(ngx_tcp_session_t *s, ngx_buf_t **header, ngx_buf_t } +static ngx_int_t ngx_tcp_monitor_build_response(ngx_tcp_session_t *s) +{ + u_char chr; + ngx_tcp_monitor_ctx_t *pctx; + + pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); + set_monitor_packet_size(pctx->header_out->start, 0); + if (*(pctx->upstream_response->last - 1) != LF || + *(pctx->upstream_response->last - 2) != CR) { + return NGX_ERROR; + } + chr = *pctx->upstream_response->pos; + switch (chr) { + case '+': + case ':': + set_monitor_return_code(pctx->header_out->start, 0); + break; + case '-': + default: + set_monitor_return_code(pctx->header_out->start, 1); + break; + } + return NGX_OK; +} + + static void ngx_tcp_monitor_init_upstream(ngx_connection_t *c, ngx_tcp_session_t *s) { @@ -595,229 +752,6 @@ ngx_tcp_upstream_init_monitor_handler(ngx_tcp_session_t *s, ngx_tcp_upstream_t * } -#if 0 -static void -ngx_tcp_monitor_handler(ngx_event_t *ev) -{ - char *action, *recv_action, *send_action; - off_t *read_bytes, *write_bytes; - size_t size; - ssize_t n; - ngx_buf_t *b; - ngx_err_t err; - ngx_uint_t do_write; - ngx_connection_t *c, *src, *dst; - ngx_tcp_session_t *s; - ngx_tcp_monitor_conf_t *pcf; - ngx_tcp_monitor_ctx_t *pctx; - ngx_tcp_core_srv_conf_t *cscf; - - c = ev->data; - s = c->data; - - cscf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_core_module); - - if (ev->timedout) { - c->log->action = "monitoring"; - - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "monitor timed out"); - c->timedout = 1; - - ngx_tcp_finalize_session(s); - return; - } - - pctx = ngx_tcp_get_module_ctx(s, ngx_tcp_monitor_module); - - if (pctx == NULL) { - ngx_tcp_finalize_session(s); - return; - } - - read_bytes = NULL; - write_bytes = NULL; - - if (c == s->connection) { - if (ev->write) { - recv_action = "client write: monitoring and reading from upstream"; - send_action = "client write: monitoring and sending to client"; - src = pctx->upstream->connection; - dst = c; - b = pctx->buffer; - write_bytes = &s->bytes_write; - } else { - recv_action = "client read: monitoring and reading from client"; - send_action = "client read: monitoring and sending to upstream"; - src = c; - dst = pctx->upstream->connection; - b = s->buffer; - read_bytes = &s->bytes_read; - } - - } else { - if (ev->write) { - recv_action = "upstream write: monitoring and reading from client"; - send_action = "upstream write: monitoring and sending to upstream"; - src = s->connection; - dst = c; - b = s->buffer; - read_bytes = &s->bytes_read; - } else { - recv_action = "upstream read: monitoring and reading from upstream"; - send_action = "upstream read: monitoring and sending to client"; - src = c; - dst = s->connection; - b = pctx->buffer; - write_bytes = &s->bytes_write; - } - } - - // FIXME: this is debug code - int packet_size = 0; - if (c == s->connection && src == c) { - printf("packet_size: %d\n", packet_size); - if (*read_bytes >= (off_t)HEADER_LENGTH) { - packet_size = monitor_packet_size(b->pos); - printf("packet_size: %d\n", packet_size); - } - } - - do_write = ev->write ? 1 : 0; - - ngx_log_debug4(NGX_LOG_DEBUG_TCP, ev->log, 0, - "tcp monitor handler: %d, #%d > #%d, time:%ui", - do_write, src->fd, dst->fd, ngx_current_msec); - - pcf = ngx_tcp_get_module_srv_conf(s, ngx_tcp_monitor_module); - - for ( ;; ) { - - if (do_write) { - - size = b->last - b->pos; - - if (size && dst->write->ready) { - c->log->action = send_action; - - n = dst->send(dst, b->pos, size); - err = ngx_socket_errno; - - ngx_log_debug1(NGX_LOG_DEBUG_TCP, ev->log, 0, - "tcp monitor handler send:%d", n); - - if (n == NGX_ERROR) { - ngx_log_error(NGX_LOG_ERR, c->log, err, "monitor send error"); - - ngx_tcp_finalize_session(s); - return; - } - - if (n > 0) { - b->pos += n; - - if (write_bytes) { - *write_bytes += n; - } - - if (b->pos == b->last) { - b->pos = b->start; - b->last = b->start; - } - } - } - } - - size = b->end - b->last; - - if (size) { - if (src->read->ready) { - - c->log->action = recv_action; - - n = src->recv(src, b->last, size); - err = ngx_socket_errno; - - ngx_log_debug1(NGX_LOG_DEBUG_TCP, ev->log, 0, - "tcp monitor handler recv:%d", n); - - if (n == NGX_AGAIN || n == 0) { - break; - } - - if (n > 0) { - do_write = 1; - b->last += n; - - if (read_bytes) { - *read_bytes += n; - } - - continue; - } - - if (n == NGX_ERROR) { - src->read->eof = 1; - } - } - } - - break; - } - - c->log->action = "nginx tcp monitoring"; - - if ((s->connection->read->eof && s->buffer->pos == s->buffer->last) - || (pctx->upstream->connection->read->eof - && pctx->buffer->pos == pctx->buffer->last) - || (s->connection->read->eof - && pctx->upstream->connection->read->eof)) - { - action = c->log->action; - c->log->action = NULL; - ngx_log_error(NGX_LOG_DEBUG, c->log, 0, "monitor session done"); - c->log->action = action; - - ngx_tcp_finalize_session(s); - return; - } - - if (ngx_handle_write_event(dst->write, 0) != NGX_OK) { - ngx_tcp_finalize_session(s); - return; - } - - if (ngx_handle_read_event(dst->read, 0) != NGX_OK) { - ngx_tcp_finalize_session(s); - return; - } - - if (ngx_handle_write_event(src->write, 0) != NGX_OK) { - ngx_tcp_finalize_session(s); - return; - } - - if (ngx_handle_read_event(src->read, 0) != NGX_OK) { - ngx_tcp_finalize_session(s); - return; - } - - if (c == s->connection) { - ngx_add_timer(c->read, cscf->timeout); - } - - if (c == pctx->upstream->connection) { - if (ev->write) { - ngx_add_timer(c->write, pcf->upstream.send_timeout); - } else { - ngx_add_timer(c->read, pcf->upstream.read_timeout); - } - } - - return; -} -#endif - - static char * ngx_tcp_monitor_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { From 32bc82ceed69a8974e6bf94c7c02c1010a47a631 Mon Sep 17 00:00:00 2001 From: ideal Date: Wed, 11 Dec 2013 17:33:03 +0800 Subject: [PATCH 31/34] it's better add __attribute__ ((packed)) --- modules/ngx_tcp_monitor_server_module.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ngx_tcp_monitor_server_module.c b/modules/ngx_tcp_monitor_server_module.c index b79af16..3cd5b3f 100644 --- a/modules/ngx_tcp_monitor_server_module.c +++ b/modules/ngx_tcp_monitor_server_module.c @@ -18,7 +18,7 @@ typedef struct ngx_tcp_monitor_header_s { uint32_t length; uint16_t type; uint16_t spare0; -} ngx_tcp_monitor_header_t; +} __attribute__ ((packed)) ngx_tcp_monitor_header_t; #define HEADER_LENGTH sizeof(ngx_tcp_monitor_header_t) From b823978a5d90bf6a4a32c97f624e024c9e962804 Mon Sep 17 00:00:00 2001 From: ideal Date: Sun, 15 Dec 2013 16:42:26 +0800 Subject: [PATCH 32/34] add rpc server module --- config | 4 +- modules/ngx_tcp_rpc_server_module.c | 176 ++++++++++++++++++++++++++++ 2 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 modules/ngx_tcp_rpc_server_module.c diff --git a/config b/config index 977e576..f94a564 100644 --- a/config +++ b/config @@ -4,7 +4,7 @@ ngx_feature_run=no ngx_feature_incs= ngx_feature_path="$ngx_addon_dir/modules $ngx_addon_dir/parsers $ngx_addon_dir" ngx_feature_deps="$ngx_addon_dir/ngx_tcp.h $ngx_addon_dir/ngx_tcp_session.h $ngx_addon_dir/ngx_tcp_upstream.h $ngx_addon_dir/ngx_tcp_upstream_check.h $ngx_addon_dir/ngx_tcp_upstream_round_robin.h $ngx_addon_dir/ngx_tcp_upstream_keepalive.h" -ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c $ngx_addon_dir/ngx_tcp_upstream_keepalive.c $ngx_addon_dir/modules/ngx_tcp_monitor_server_module.c" +ngx_tcp_src="$ngx_addon_dir/ngx_tcp.c $ngx_addon_dir/ngx_tcp_core_module.c $ngx_addon_dir/ngx_tcp_session.c $ngx_addon_dir/ngx_tcp_access.c $ngx_addon_dir/ngx_tcp_log.c $ngx_addon_dir/ngx_tcp_upstream.c $ngx_addon_dir/ngx_tcp_upstream_round_robin.c $ngx_addon_dir/modules/ngx_tcp_generic_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_websocket_proxy_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_ip_hash_module.c $ngx_addon_dir/modules/ngx_tcp_upstream_busyness_module.c $ngx_addon_dir/ngx_tcp_upstream_check.c $ngx_addon_dir/ngx_tcp_upstream_keepalive.c $ngx_addon_dir/modules/ngx_tcp_monitor_server_module.c $ngx_addon_dir/modules/ngx_tcp_rpc_server_module.c" ngx_tcp_ssl_deps="$ngx_addon_dir/modules/ngx_tcp_ssl_module.h" ngx_tcp_ssl_src="$ngx_addon_dir/modules/ngx_tcp_ssl_module.c" ngx_tcp_parser_deps="$ngx_addon_dir/parsers/parser.h $ngx_addon_dir/parsers/http_request_parser.h $ngx_addon_dir/parsers/http_response_parser.h $ngx_addon_dir/parsers/smtp_response_parser.h" @@ -17,7 +17,7 @@ if [ $ngx_found = yes ]; then ngx_addon_name=ngx_tcp_module TCP_CORE_MODULES="ngx_tcp_module ngx_tcp_core_module ngx_tcp_upstream_module" - TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module ngx_tcp_upstream_keepalive_module ngx_tcp_monitor_module" + TCP_MODULES="ngx_tcp_proxy_module ngx_tcp_websocket_module ngx_tcp_upstream_ip_hash_module ngx_tcp_upstream_busyness_module ngx_tcp_upstream_keepalive_module ngx_tcp_monitor_module ngx_tcp_rpc_module" NGX_ADDON_DEPS="$NGX_ADDON_DEPS $ngx_feature_deps $ngx_tcp_parser_deps" NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_tcp_src $ngx_tcp_parser_src" diff --git a/modules/ngx_tcp_rpc_server_module.c b/modules/ngx_tcp_rpc_server_module.c new file mode 100644 index 0000000..24ae9cd --- /dev/null +++ b/modules/ngx_tcp_rpc_server_module.c @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2013 Shang Yuanchun + * + */ + +#include +#include +#include + +/* + * header: |---- 4 ----|-- 2 --|-- 2 --| + * length type padding + * + * all are little endian + * + */ +typedef struct ngx_tcp_rpc_header_s { + uint32_t length; + uint32_t magic; + uint16_t type; + uint16_t version; + uint32_t spare0; +} __attribute__ ((packed)) ngx_tcp_rpc_header_t; + +#define HEADER_LENGTH sizeof(ngx_tcp_rpc_header_t) + +#define rpc_packet_size(ptr) (*(u_char *)(ptr) + \ + (*((u_char *)(ptr) + 1) << 8) + \ + (*((u_char *)(ptr) + 2) << 16) + \ + (*((u_char *)(ptr) + 3) << 24) ) + +#define RPC_TYPE_OFFSET offsetof(ngx_tcp_rpc_header_t, type) +#define rpc_packet_type(ptr) (*((u_char *)(ptr) + RPC_TYPE_OFFSET) + \ + (*((u_char *)(ptr) + RPC_TYPE_OFFSET + 1) << 8) ) + +#define PACKET_TYPE_JSON 1 +#define PACKET_TYPE_TLV 2 +#define PACKET_TYPE_BSON 3 +#define PACKET_TYPE_MSGPACK 4 + +typedef struct ngx_tcp_rpc_ctx_s { + ngx_peer_connection_t *upstream; + + // ngx_tcp_session_t's buffer is header_in + // request_body is the request body + ngx_buf_t *request_body; + ngx_uint_t request_len; + + ngx_buf_t *header_out; +} ngx_tcp_rpc_ctx_t; + + +typedef struct ngx_tcp_rpc_conf_s { + ngx_tcp_upstream_conf_t upstream; + ngx_int_t rpc_server; + ngx_str_t document_root; +} ngx_tcp_rpc_conf_t; + +static void ngx_tcp_rpc_init_session(ngx_tcp_session_t *s); +static void ngx_tcp_rpc_client_read_handler(ngx_event_t *rev); +static void ngx_tcp_rpc_client_write_handler(ngx_event_t *wev); +static void *ngx_tcp_rpc_create_conf(ngx_conf_t *cf); +static char *ngx_tcp_rpc_merge_conf(ngx_conf_t *cf, void *parent, + void *child); + +static ngx_tcp_protocol_t ngx_tcp_rpc_protocol = { + + ngx_string("rpc_server"), + { 0, 0, 0, 0 }, + NGX_TCP_GENERIC_PROTOCOL, + ngx_tcp_rpc_init_session, + NULL, + NULL, + ngx_string("500 Internal server error" CRLF) + +}; + + +static ngx_command_t ngx_tcp_rpc_commands[] = { + + { ngx_string("rpc_server"), + NGX_TCP_MAIN_CONF|NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_rpc_conf_t, rpc_server), + NULL }, + + { ngx_string("root"), + NGX_TCP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_TCP_SRV_CONF_OFFSET, + offsetof(ngx_tcp_rpc_conf_t, document_root), + NULL }, + + ngx_null_command +}; + + +static ngx_tcp_module_t ngx_tcp_rpc_module_ctx = { + &ngx_tcp_rpc_protocol, /* protocol */ + + NULL, /* create main configuration */ + NULL, /* init main configuration */ + + ngx_tcp_rpc_create_conf, /* create server configuration */ + ngx_tcp_rpc_merge_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_tcp_rpc_module = { + NGX_MODULE_V1, + &ngx_tcp_rpc_module_ctx, /* module context */ + ngx_tcp_rpc_commands, /* module directives */ + NGX_TCP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void +ngx_tcp_rpc_init_session(ngx_tcp_session_t *s) +{ + return; +} + + +static void +ngx_tcp_rpc_client_read_handler(ngx_event_t *rev) +{ + return; +} + + +static void +ngx_tcp_rpc_client_write_handler(ngx_event_t *wev) +{ + return; +} + + +static void * +ngx_tcp_rpc_create_conf(ngx_conf_t *cf) +{ + ngx_tcp_rpc_conf_t *pcf; + + pcf = ngx_pcalloc(cf->pool, sizeof(ngx_tcp_rpc_conf_t)); + if (pcf == NULL) { + return NULL; + } + + return pcf; +} + + +#define unused(arg) (void)(arg) + +static char * +ngx_tcp_rpc_merge_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_tcp_rpc_conf_t *prev = parent; + ngx_tcp_rpc_conf_t *conf = child; + + unused(prev); + unused(conf); + + unused(ngx_tcp_rpc_client_read_handler); + unused(ngx_tcp_rpc_client_write_handler); + + return NGX_CONF_OK; +} From 2e8f848ce7ca3797e69af7130823b30133a99b1b Mon Sep 17 00:00:00 2001 From: ideal Date: Sun, 15 Dec 2013 16:43:22 +0800 Subject: [PATCH 33/34] remove end blank --- modules/ngx_tcp_rpc_server_module.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/ngx_tcp_rpc_server_module.c b/modules/ngx_tcp_rpc_server_module.c index 24ae9cd..907304a 100644 --- a/modules/ngx_tcp_rpc_server_module.c +++ b/modules/ngx_tcp_rpc_server_module.c @@ -56,7 +56,7 @@ typedef struct ngx_tcp_rpc_conf_s { ngx_str_t document_root; } ngx_tcp_rpc_conf_t; -static void ngx_tcp_rpc_init_session(ngx_tcp_session_t *s); +static void ngx_tcp_rpc_init_session(ngx_tcp_session_t *s); static void ngx_tcp_rpc_client_read_handler(ngx_event_t *rev); static void ngx_tcp_rpc_client_write_handler(ngx_event_t *wev); static void *ngx_tcp_rpc_create_conf(ngx_conf_t *cf); @@ -123,29 +123,29 @@ ngx_module_t ngx_tcp_rpc_module = { }; -static void -ngx_tcp_rpc_init_session(ngx_tcp_session_t *s) +static void +ngx_tcp_rpc_init_session(ngx_tcp_session_t *s) { return; } static void -ngx_tcp_rpc_client_read_handler(ngx_event_t *rev) +ngx_tcp_rpc_client_read_handler(ngx_event_t *rev) { return; } static void -ngx_tcp_rpc_client_write_handler(ngx_event_t *wev) +ngx_tcp_rpc_client_write_handler(ngx_event_t *wev) { return; } static void * -ngx_tcp_rpc_create_conf(ngx_conf_t *cf) +ngx_tcp_rpc_create_conf(ngx_conf_t *cf) { ngx_tcp_rpc_conf_t *pcf; @@ -161,7 +161,7 @@ ngx_tcp_rpc_create_conf(ngx_conf_t *cf) #define unused(arg) (void)(arg) static char * -ngx_tcp_rpc_merge_conf(ngx_conf_t *cf, void *parent, void *child) +ngx_tcp_rpc_merge_conf(ngx_conf_t *cf, void *parent, void *child) { ngx_tcp_rpc_conf_t *prev = parent; ngx_tcp_rpc_conf_t *conf = child; From 6d1424279a1f94a913d83fc79a8fd057cd2b24da Mon Sep 17 00:00:00 2001 From: ideal Date: Fri, 20 Dec 2013 15:26:36 +0800 Subject: [PATCH 34/34] init first_read --- modules/ngx_tcp_generic_proxy_module.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/ngx_tcp_generic_proxy_module.c b/modules/ngx_tcp_generic_proxy_module.c index a65ed8b..8d8ad27 100644 --- a/modules/ngx_tcp_generic_proxy_module.c +++ b/modules/ngx_tcp_generic_proxy_module.c @@ -391,6 +391,8 @@ ngx_tcp_proxy_handler(ngx_event_t *ev) /* SSL Need this */ if (s->connection->ssl) { first_read = 1; + } else { + first_read = 0; } #else first_read = 0;