Skip to content

Commit

Permalink
Document config options.
Browse files Browse the repository at this point in the history
  • Loading branch information
alaisi committed Sep 16, 2017
1 parent 3de6d73 commit dcef7dc
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions src/pg_kafka_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ static rd_kafka_t* pg_kafka_open_producer()
{
char str[512];
rd_kafka_conf_t* conf = rd_kafka_conf_new();
if(rd_kafka_conf_set(conf, "bootstrap.servers", KAFKA_SERVERS, str, sizeof(str)) != RD_KAFKA_CONF_OK) {
if(rd_kafka_conf_set(conf, "bootstrap.servers", KAFKA_SERVERS, str, sizeof(str))
!= RD_KAFKA_CONF_OK) {
ereport(WARNING, (errmsg("rd_kafka_conf_set: %s", str)));
}
rd_kafka_t* producer;
Expand Down Expand Up @@ -65,26 +66,32 @@ static int pg_kafka_has_msg_in_buffer()
return 0;
}

static bool pg_kafka_skip_message(char* msg)
{
return strstr(msg, "{\"type\":\"transaction.") != NULL; // skip transaction.begin/commit
}

static void pg_kafka_publish_messages(char* str, int r, rd_kafka_t* producer, rd_kafka_topic_t* topic)
{
memcpy(buf + buf_index, str, r);
buf_index += r;

for(int len = pg_kafka_has_msg_in_buffer(); len; len = pg_kafka_has_msg_in_buffer()) {
char msg[len + 1];
char* msg = malloc(len + 1);
memcpy(msg, buf, len);
msg[len] = '\0';

if(strstr(msg, "{\"type\":\"transaction.") == NULL) { // skip transaction.begin/commit
if(!pg_kafka_skip_message(msg)) {
if(rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY, msg, len,
NULL, 0, NULL) == -1) {
ereport(WARNING, (errmsg("rd_kafka_produce: %s", rd_kafka_err2str(rd_kafka_last_error()))));
} else {
RD_KAFKA_MSG_F_FREE, msg, len,
NULL, 0, NULL) != -1) {
ereport(NOTICE, (errmsg("published kafka msg: '%s'", msg)));
} else {
ereport(WARNING, (errmsg("rd_kafka_produce: %s",
rd_kafka_err2str(rd_kafka_last_error()))));
}
}

if(len != buf_index - 2) {
memcpy(buf, buf + len + 1, buf_index - len);
buf_index = buf_index - len - 1;
Expand Down Expand Up @@ -117,7 +124,6 @@ static void pg_kafka_publisher_loop(int pipe)
pg_kafka_log_err("read failed");
break;
} else if (r == 0) {
pg_kafka_log_err("pipe closed");
break;
}
pg_kafka_publish_messages(str, r, producer, topic);
Expand All @@ -137,7 +143,7 @@ static void pg_kafka_exec_revclocal(int* pipes)
"--start",
"-f", "-",
"-S", "kafka_events",
"-P", "decoding_json",
"-P", RECVLOGICAL_DECODER,
"-d", DATABASE_NAME,
NULL);
pg_kafka_log_err("recvlogical failed");
Expand Down

0 comments on commit dcef7dc

Please sign in to comment.