From dcef7dc73d80f2b6e754fee4d439183c66a640dd Mon Sep 17 00:00:00 2001 From: anttil Date: Sat, 16 Sep 2017 18:07:53 +0300 Subject: [PATCH] Document config options. --- src/pg_kafka_events.c | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/pg_kafka_events.c b/src/pg_kafka_events.c index 318ec53..663005e 100644 --- a/src/pg_kafka_events.c +++ b/src/pg_kafka_events.c @@ -36,7 +36,8 @@ static rd_kafka_t* pg_kafka_open_producer() { char str[512]; rd_kafka_conf_t* conf = rd_kafka_conf_new(); - if(rd_kafka_conf_set(conf, "bootstrap.servers", KAFKA_SERVERS, str, sizeof(str)) != RD_KAFKA_CONF_OK) { + if(rd_kafka_conf_set(conf, "bootstrap.servers", KAFKA_SERVERS, str, sizeof(str)) + != RD_KAFKA_CONF_OK) { ereport(WARNING, (errmsg("rd_kafka_conf_set: %s", str))); } rd_kafka_t* producer; @@ -65,26 +66,32 @@ static int pg_kafka_has_msg_in_buffer() return 0; } +static bool pg_kafka_skip_message(char* msg) +{ + return strstr(msg, "{\"type\":\"transaction.") != NULL; // skip transaction.begin/commit +} + static void pg_kafka_publish_messages(char* str, int r, rd_kafka_t* producer, rd_kafka_topic_t* topic) { memcpy(buf + buf_index, str, r); buf_index += r; for(int len = pg_kafka_has_msg_in_buffer(); len; len = pg_kafka_has_msg_in_buffer()) { - char msg[len + 1]; + char* msg = malloc(len + 1); memcpy(msg, buf, len); msg[len] = '\0'; - if(strstr(msg, "{\"type\":\"transaction.") == NULL) { // skip transaction.begin/commit + if(!pg_kafka_skip_message(msg)) { if(rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, - RD_KAFKA_MSG_F_COPY, msg, len, - NULL, 0, NULL) == -1) { - ereport(WARNING, (errmsg("rd_kafka_produce: %s", rd_kafka_err2str(rd_kafka_last_error())))); - } else { + RD_KAFKA_MSG_F_FREE, msg, len, + NULL, 0, NULL) != -1) { ereport(NOTICE, (errmsg("published kafka msg: '%s'", msg))); + } else { + ereport(WARNING, (errmsg("rd_kafka_produce: %s", + rd_kafka_err2str(rd_kafka_last_error())))); } } - + if(len != buf_index - 2) { memcpy(buf, buf + len + 1, buf_index - len); buf_index = buf_index - len - 1; @@ -117,7 +124,6 @@ static void pg_kafka_publisher_loop(int pipe) pg_kafka_log_err("read failed"); break; } else if (r == 0) { - pg_kafka_log_err("pipe closed"); break; } pg_kafka_publish_messages(str, r, producer, topic); @@ -137,7 +143,7 @@ static void pg_kafka_exec_revclocal(int* pipes) "--start", "-f", "-", "-S", "kafka_events", - "-P", "decoding_json", + "-P", RECVLOGICAL_DECODER, "-d", DATABASE_NAME, NULL); pg_kafka_log_err("recvlogical failed");