diff --git a/src/main/config/log4j.prod.properties b/src/main/config/log4j.prod.properties index fce4ca6c7..7067a9050 100644 --- a/src/main/config/log4j.prod.properties +++ b/src/main/config/log4j.prod.properties @@ -2,9 +2,12 @@ # root logger. log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE +log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher=DEBUG +log4j.logger.com.pinterest.secor.consumer.Consumer=DEBUG +log4j.logger.com.pinterest.secor.common.SingleProcessCounter=DEBUG log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.Threshold=WARN +log4j.appender.CONSOLE.Threshold=INFO log4j.appender.CONSOLE.Target=System.err log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n diff --git a/src/main/java/com/pinterest/secor/common/FileRegistry.java b/src/main/java/com/pinterest/secor/common/FileRegistry.java index 558921f7c..cdf1968dc 100644 --- a/src/main/java/com/pinterest/secor/common/FileRegistry.java +++ b/src/main/java/com/pinterest/secor/common/FileRegistry.java @@ -27,11 +27,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; +import java.util.*; /** * FileRegistry keeps track of local log files currently being appended to and the associated diff --git a/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java b/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java index 01d91741b..3d09a7fca 100644 --- a/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java +++ b/src/main/java/com/pinterest/secor/common/LegacyKafkaClient.java @@ -24,13 +24,7 @@ import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.OffsetRequest; -import kafka.javaapi.OffsetResponse; -import kafka.javaapi.PartitionMetadata; -import kafka.javaapi.TopicMetadata; -import kafka.javaapi.TopicMetadataRequest; -import kafka.javaapi.TopicMetadataResponse; +import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; import org.apache.kafka.common.protocol.Errors; @@ -234,8 +228,8 @@ public Message getCommittedMessage(TopicPartition topicPartition) throws Excepti } return getMessage(topicPartition, committedOffset, consumer); } catch (MessageDoesNotExistException e) { - // If a MessageDoesNotExistException exception is raised, - // the message at the last committed offset does not exist in Kafka. + // If a MessageDoesNotExistException exception is raised, + // the message at the last committed offset does not exist in Kafka. // This is usually due to the message being compacted away by the // Kafka log compaction process. // diff --git a/src/main/java/com/pinterest/secor/common/OffsetTracker.java b/src/main/java/com/pinterest/secor/common/OffsetTracker.java index 1ec167da3..68cb90fbe 100644 --- a/src/main/java/com/pinterest/secor/common/OffsetTracker.java +++ b/src/main/java/com/pinterest/secor/common/OffsetTracker.java @@ -97,4 +97,37 @@ public long setCommittedOffsetCount(TopicPartition topicPartition, long count) { mCommittedOffsetCount.put(topicPartition, count); return trueCommittedOffsetCount; } + + + public String toString() { + + StringBuilder sb = new StringBuilder(); + sb.append("Topic Offset dump: \n"); + sb.append("First Seen offset:\n"); + dump(mFirstSeendOffset, sb); + + sb.append("Last Seen offset:\n"); + dump(mLastSeenOffset, sb); + + sb.append("Committed offset: \n"); + dump(mCommittedOffsetCount, sb); + + return sb.toString(); + + } + + private StringBuilder dump(HashMap offsetMap, StringBuilder sb) { + offsetMap.forEach((tp, offset) -> { + sb + .append("[") + .append(tp.toString()) + .append(", Offset:" + offset) + .append("]") + .append("\n"); + }); + + return sb; + } + + } diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 531d72286..99c6e1908 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -225,6 +225,10 @@ public String getFetchMinBytes() { return getString("kafka.fetch.min.bytes"); } + public String getMetaDataRefreshInterval() { + return getString("kafka.metadata.max.age.ms", "90000"); + } + public String getFetchMaxBytes() { return getString("kafka.fetch.max.bytes"); } diff --git a/src/main/java/com/pinterest/secor/common/SingleProcessCounter.java b/src/main/java/com/pinterest/secor/common/SingleProcessCounter.java new file mode 100644 index 000000000..523666a80 --- /dev/null +++ b/src/main/java/com/pinterest/secor/common/SingleProcessCounter.java @@ -0,0 +1,89 @@ +package com.pinterest.secor.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SingleProcessCounter { + private static final Logger LOG = LoggerFactory.getLogger(SingleProcessCounter.class); + + private ConcurrentHashMap mMessageUploadCounter; + + private ConcurrentHashMap mMessageLocalCounter; + + private static volatile SingleProcessCounter counter = null; + + private static Object lock = new Object(); + + private SingleProcessCounter() { + mMessageLocalCounter = new ConcurrentHashMap<>(); + mMessageUploadCounter = new ConcurrentHashMap<>(); + } + + public static SingleProcessCounter getSingleProcessCounter() { + if (counter != null) return counter; + + synchronized (lock) { + if (counter == null) + counter = new SingleProcessCounter(); + } + return counter; + } + + public void increment(TopicPartition tp, Long delta) { + long bufferValue = mMessageLocalCounter.merge(tp, delta, (v_old, v_delta) -> v_old + v_delta); + + if (LOG.isDebugEnabled()) + LOG.debug("Topic {} Partition {} local message {}", tp.getTopic(), tp.getPartition(), bufferValue); + + } + + public void decrement(TopicPartition tp, Long delta) { + long bufferValue = mMessageLocalCounter.merge(tp, delta, (v_old, v_delta) -> v_old - v_delta); + + if (LOG.isDebugEnabled()) + LOG.debug("Topic {} Partition {} local message {}", tp.getTopic(), tp.getPartition(), bufferValue); + } + + public void topicUploaded(TopicPartition tp) { + long counter = getLocalCounter(tp); + mMessageUploadCounter.merge(tp, counter, (v_old, v_delta) -> v_old + v_delta); + decrement(tp, counter); + } + + public long getLocalCounter(TopicPartition tp) { + return mMessageLocalCounter.getOrDefault(tp, 0l); + } + + public long getTotalCounter(TopicPartition tp) { + return mMessageLocalCounter.values().stream().reduce((a, b) -> a + b).orElse(0l) + mMessageUploadCounter.values().stream().reduce((a, b) -> a + b).orElse(0l); + + } + + public String toString() { + + StringBuilder sb = new StringBuilder(); + sb.append("Message completed stats: \n"); + toString(mMessageLocalCounter, sb, "Current local Msg written counter: "); + toString(mMessageUploadCounter, sb, "Uploaded Msg counter "); + + return sb.toString(); + } + + private void toString(Map map, StringBuilder sb, String msg) { + map.forEach((tp, offset) -> { + sb + .append("[") + .append(tp.toString()) + .append("," + msg + offset) + .append("]") + .append("\n"); + }); + } + + public void resetLocalCount(TopicPartition topicPartition) { + mMessageLocalCounter.merge(topicPartition, 0l, (v_old, v_set) -> v_set); + } +} diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java index 74a925227..8422e9ce0 100644 --- a/src/main/java/com/pinterest/secor/consumer/Consumer.java +++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java @@ -22,6 +22,8 @@ import com.pinterest.secor.common.FileRegistry; import com.pinterest.secor.common.OffsetTracker; import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.common.SingleProcessCounter; +import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.message.Message; import com.pinterest.secor.message.ParsedMessage; import com.pinterest.secor.monitoring.MetricCollector; @@ -75,6 +77,7 @@ public class Consumer extends Thread { private boolean mUploadOnShutdown; private volatile boolean mShuttingDown = false; private static volatile boolean mCallingSystemExit = false; + private static SingleProcessCounter spc = SingleProcessCounter.getSingleProcessCounter(); public Consumer(SecorConfig config) { mConfig = config; @@ -156,8 +159,9 @@ public void run() { // check upload policy every N seconds or 10,000 messages/consumer timeouts long checkEveryNSeconds = Math.min(10 * 60, mConfig.getMaxFileAgeSeconds() / 2); long checkMessagesPerSecond = mConfig.getMessagesPerSecond(); - long nMessages = 0; + long nMsgPulls = 0; long lastChecked = System.currentTimeMillis(); + long timeStamp = System.currentTimeMillis(); while (true) { boolean hasMoreMessages = consumeNextMessage(); if (!hasMoreMessages) { @@ -170,8 +174,14 @@ public void run() { } long now = System.currentTimeMillis(); + + if (nMsgPulls % 1000 == 0 || now - timeStamp > 60 * 1000) { + LOG.info("nMsgPulls: " + nMsgPulls + " lastChecked: " + lastChecked); + timeStamp = now; + } + if (mDeterministicUploadPolicyTracker != null || - nMessages++ % checkMessagesPerSecond == 0 || + nMsgPulls++ % checkMessagesPerSecond == 0 || (now - lastChecked) > checkEveryNSeconds * 1000) { lastChecked = now; checkUploadPolicy(false); @@ -191,6 +201,7 @@ public void run() { protected void checkUploadPolicy(boolean forceUpload) { try { + LOG.info("checkUploadPolicy invoked, " + mOffsetTracker.toString() + ", " + spc.toString()); mUploader.applyPolicy(forceUpload); } catch (Exception e) { throw new RuntimeException("Failed to apply upload policy", e); @@ -242,7 +253,8 @@ protected boolean consumeNextMessage() { if (parsedMessage != null) { try { mMessageWriter.write(parsedMessage); - + spc.increment(new TopicPartition(rawMessage.getTopic(), + rawMessage.getKafkaPartition()), 1l); mMetricCollector.metric("consumer.message_size_bytes", rawMessage.getPayload().length, rawMessage.getTopic()); mMetricCollector.increment("consumer.throughput_bytes", rawMessage.getPayload().length, rawMessage.getTopic()); } catch (Exception e) { @@ -252,6 +264,8 @@ protected boolean consumeNextMessage() { if (LOG.isTraceEnabled()) { LOG.trace("Failed to write message " + parsedMessage, e); } + // shouldn't mute the exception but do a force upload of current available data, so the next run will be successful + checkUploadPolicy(true); throw new RuntimeException("Failed to write message " + parsedMessage.toTruncatedString(), e); } if (mDeterministicUploadPolicyTracker != null) { diff --git a/src/main/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactory.java b/src/main/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactory.java index a2a825dde..c95e5abaf 100644 --- a/src/main/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactory.java +++ b/src/main/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactory.java @@ -18,28 +18,6 @@ */ package com.pinterest.secor.io.impl; -import java.io.IOException; -import java.io.StringWriter; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Lz4Codec; -import org.apache.hadoop.io.compress.SnappyCodec; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.orc.CompressionKind; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; -import org.apache.orc.Writer; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.gson.Gson; import com.google.gson.JsonObject; import com.pinterest.secor.common.FileRegistry; @@ -54,6 +32,22 @@ import com.pinterest.secor.util.orc.VectorColumnFiller; import com.pinterest.secor.util.orc.VectorColumnFiller.JsonConverter; import com.pinterest.secor.util.orc.schema.ORCSchemaProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.orc.*; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.List; /** * ORC reader/writer implementation @@ -155,7 +149,7 @@ public JsonORCFileWriter(LogFilePath logFilePath, CompressionCodec codec) if (schema == null) { String topic = logFilePath.getTopic(); throw new IllegalArgumentException( - String.format("No schema is provided for topic '%s'", topic)); + String.format("No schema is provided for topic '%s'", topic)); } List fieldTypes = schema.getChildren(); converters = new JsonConverter[fieldTypes.size()]; diff --git a/src/main/java/com/pinterest/secor/main/LogFilePrinterMain.java b/src/main/java/com/pinterest/secor/main/LogFilePrinterMain.java index efdbd0944..6fe7c4405 100644 --- a/src/main/java/com/pinterest/secor/main/LogFilePrinterMain.java +++ b/src/main/java/com/pinterest/secor/main/LogFilePrinterMain.java @@ -21,12 +21,7 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.tools.LogFilePrinter; import com.pinterest.secor.util.FileUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/pinterest/secor/main/LogFileVerifierMain.java b/src/main/java/com/pinterest/secor/main/LogFileVerifierMain.java index 350454133..7ca5523f7 100644 --- a/src/main/java/com/pinterest/secor/main/LogFileVerifierMain.java +++ b/src/main/java/com/pinterest/secor/main/LogFileVerifierMain.java @@ -21,12 +21,7 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.tools.LogFileVerifier; import com.pinterest.secor.util.FileUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java b/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java index 1297f5261..24db40bf3 100644 --- a/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java +++ b/src/main/java/com/pinterest/secor/main/TestLogMessageProducerMain.java @@ -19,12 +19,7 @@ package com.pinterest.secor.main; import com.pinterest.secor.tools.TestLogMessageProducer; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/pinterest/secor/main/ZookeeperClientMain.java b/src/main/java/com/pinterest/secor/main/ZookeeperClientMain.java index 933b0a3a3..2a9f5510e 100644 --- a/src/main/java/com/pinterest/secor/main/ZookeeperClientMain.java +++ b/src/main/java/com/pinterest/secor/main/ZookeeperClientMain.java @@ -21,12 +21,7 @@ import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.common.ZookeeperConnector; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index 16383277b..8089528bd 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -18,11 +18,7 @@ */ package com.pinterest.secor.parser; -import com.pinterest.secor.common.KafkaClient; -import com.pinterest.secor.common.LogFilePath; -import com.pinterest.secor.common.SecorConfig; -import com.pinterest.secor.common.TopicPartition; -import com.pinterest.secor.common.ZookeeperConnector; +import com.pinterest.secor.common.*; import com.pinterest.secor.message.Message; import com.pinterest.secor.util.CompressionUtil; import com.pinterest.secor.util.FileUtil; diff --git a/src/main/java/com/pinterest/secor/parser/QuboleClient.java b/src/main/java/com/pinterest/secor/parser/QuboleClient.java index f6c7e5dc5..0e211207b 100644 --- a/src/main/java/com/pinterest/secor/parser/QuboleClient.java +++ b/src/main/java/com/pinterest/secor/parser/QuboleClient.java @@ -22,11 +22,7 @@ import net.minidev.json.JSONObject; import net.minidev.json.JSONValue; -import java.io.BufferedReader; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.util.Map; diff --git a/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java b/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java index cf9eac67e..6f977d227 100644 --- a/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java +++ b/src/main/java/com/pinterest/secor/reader/LegacyKafkaMessageIterator.java @@ -23,14 +23,7 @@ import com.pinterest.secor.message.Message; import com.pinterest.secor.timestamp.KafkaMessageTimestampFactory; import com.pinterest.secor.util.IdUtil; -import kafka.consumer.Blacklist; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.consumer.TopicFilter; -import kafka.consumer.Whitelist; +import kafka.consumer.*; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.slf4j.Logger; diff --git a/src/main/java/com/pinterest/secor/reader/MessageReader.java b/src/main/java/com/pinterest/secor/reader/MessageReader.java index ec8549a8a..b45bd0c31 100644 --- a/src/main/java/com/pinterest/secor/reader/MessageReader.java +++ b/src/main/java/com/pinterest/secor/reader/MessageReader.java @@ -109,8 +109,8 @@ public Message read() { exportStats(); } if (message.getOffset() < committedOffsetCount) { - LOG.debug("skipping message {} because its offset precedes committed offset count {}", - message, committedOffsetCount); + LOG.info("skipping message topic {} offset {} because its offset precedes committed offset count {}", + message.getTopic(), message.getOffset(), committedOffsetCount); return null; } return message; diff --git a/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java b/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java index 56a8f2b5d..02a8a8847 100644 --- a/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java +++ b/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java @@ -117,6 +117,7 @@ public void init(SecorConfig config) throws UnknownHostException { optionalConfig(config.getSslProvider(), conf -> props.put("ssl.provider", conf)); optionalConfig(config.getSslTruststoreType(), conf -> props.put("ssl.truststore.type", conf)); optionalConfig(config.getNewConsumerPartitionAssignmentStrategyClass(), conf -> props.put("partition.assignment.strategy", conf)); + optionalConfig(config.getMetaDataRefreshInterval(), conf -> props.put("metadata.max.age.ms", conf)); mZookeeperConnector = new ZookeeperConnector(config); mRecordsBatch = new ArrayDeque<>(); @@ -143,7 +144,6 @@ private void optionalConfig(String maybeConf, Consumer configConsumer) { @Override public void subscribe(RebalanceHandler handler, SecorConfig config) { ConsumerRebalanceListener reBalanceListener = new SecorConsumerRebalanceListener(mKafkaConsumer, mZookeeperConnector, getSkipZookeeperOffsetSeek(config), config.getNewConsumerAutoOffsetReset(), handler); - ; String[] subscribeList = config.getKafkaTopicList(); if (Strings.isNullOrEmpty(subscribeList[0])) { diff --git a/src/main/java/com/pinterest/secor/tools/LogFileVerifier.java b/src/main/java/com/pinterest/secor/tools/LogFileVerifier.java index aa43eda48..abbc8ff19 100644 --- a/src/main/java/com/pinterest/secor/tools/LogFileVerifier.java +++ b/src/main/java/com/pinterest/secor/tools/LogFileVerifier.java @@ -29,14 +29,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; /** * Log file verifier checks the consistency of log files. diff --git a/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java b/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java index 91b27d33d..ea05cfac0 100644 --- a/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java +++ b/src/main/java/com/pinterest/secor/tools/ProgressMonitor.java @@ -38,11 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.util.List; diff --git a/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java b/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java index e987ae71c..d82bb3d65 100644 --- a/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java +++ b/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java @@ -19,12 +19,7 @@ package com.pinterest.secor.uploader; import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.BasicSessionCredentials; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.auth.*; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; diff --git a/src/main/java/com/pinterest/secor/uploader/Uploader.java b/src/main/java/com/pinterest/secor/uploader/Uploader.java index 8ec9fa6cb..185ae4e90 100644 --- a/src/main/java/com/pinterest/secor/uploader/Uploader.java +++ b/src/main/java/com/pinterest/secor/uploader/Uploader.java @@ -25,6 +25,7 @@ import com.pinterest.secor.common.OffsetTracker; import com.pinterest.secor.common.SecorConfig; import com.pinterest.secor.common.SecorConstants; +import com.pinterest.secor.common.SingleProcessCounter; import com.pinterest.secor.common.TopicPartition; import com.pinterest.secor.common.ZookeeperConnector; import com.pinterest.secor.io.FileReader; @@ -65,6 +66,7 @@ public class Uploader { protected String mTopicFilter; private boolean isOffsetsStorageKafka = false; + private static SingleProcessCounter spc = SingleProcessCounter.getSingleProcessCounter(); /** @@ -142,6 +144,7 @@ protected void uploadFiles(TopicPartition topicPartition) throws Exception { mMessageReader.commit(topicPartition, lastSeenOffset + 1); } mMetricCollector.increment("uploader.file_uploads.count", paths.size(), topicPartition.getTopic()); + spc.topicUploaded(topicPartition); } else { LOG.warn("Zookeeper committed offset didn't match for topic {} partition {}: {} vs {}", topicPartition.getTopic(), topicPartition.getTopic(), zookeeperCommittedOffsetCount, @@ -182,6 +185,7 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception { int copiedMessages = 0; // Deleting the writer closes its stream flushing all pending data to the disk. mFileRegistry.deleteWriter(srcPath); + long droppedCounter = 0; try { CompressionCodec codec = null; String extension = ""; @@ -195,19 +199,21 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception { if (keyVal.getOffset() >= startOffset) { if (writer == null) { String localPrefix = mConfig.getLocalPath() + '/' + - IdUtil.getLocalMessageDir(); + IdUtil.getLocalMessageDir(); dstPath = new LogFilePath(localPrefix, srcPath.getTopic(), - srcPath.getPartitions(), srcPath.getGeneration(), - srcPath.getKafkaPartition(), startOffset, - extension); + srcPath.getPartitions(), srcPath.getGeneration(), + srcPath.getKafkaPartition(), startOffset, + extension); writer = mFileRegistry.getOrCreateWriter(dstPath, - codec); + codec); } writer.write(keyVal); if (mDeterministicUploadPolicyTracker != null) { mDeterministicUploadPolicyTracker.track(topicPartition, keyVal); } copiedMessages++; + } else { + droppedCounter++; } } } finally { @@ -216,6 +222,7 @@ private void trim(LogFilePath srcPath, long startOffset) throws Exception { } } mFileRegistry.deletePath(srcPath); + spc.decrement(topicPartition, droppedCounter); if (dstPath == null) { LOG.info("removed file {}", srcPath.getLogFilePath()); } else { @@ -266,10 +273,21 @@ protected void checkTopicPartition(TopicPartition topicPartition, boolean forceU final long size = mFileRegistry.getSize(topicPartition); final long modificationAgeSec = mFileRegistry.getModificationAgeSec(topicPartition); LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec); - shouldUpload = forceUpload || - size >= mConfig.getMaxFileSizeBytes() || - modificationAgeSec >= mConfig.getMaxFileAgeSeconds() || - isRequiredToUploadAtTime(topicPartition); + + boolean fileSizeTrigger = size >= mConfig.getMaxFileSizeBytes(); + boolean fileAgeTrigger = modificationAgeSec >= mConfig.getMaxFileAgeSeconds(); + boolean uploadTimeTrigger = isRequiredToUploadAtTime(topicPartition); + shouldUpload = forceUpload || fileAgeTrigger + || fileSizeTrigger + || uploadTimeTrigger; + + if (shouldUpload) { + String reason = forceUpload ? "forceUpload" + : fileAgeTrigger ? String.format("fileAgeSec %s is larger than config value %s", modificationAgeSec, mConfig.getMaxFileAgeSeconds()) + : fileSizeTrigger ? String.format("fileSizeBytes %s is larger than config value %s", size, mConfig.getMaxFileSizeBytes()) + : String.format("requiredToUploadAtMinute %s", mConfig.getUploadMinuteMark()); + LOG.info("UploadFile with topic partition [{}] flag set because [" + reason + "]", topicPartition); + } } if (shouldUpload) { long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition); diff --git a/src/main/java/com/pinterest/secor/util/orc/JsonFieldFiller.java b/src/main/java/com/pinterest/secor/util/orc/JsonFieldFiller.java index ee0081c2e..7e8af047c 100644 --- a/src/main/java/com/pinterest/secor/util/orc/JsonFieldFiller.java +++ b/src/main/java/com/pinterest/secor/util/orc/JsonFieldFiller.java @@ -18,8 +18,6 @@ */ package com.pinterest.secor.util.orc; -import java.util.List; - import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; @@ -36,6 +34,8 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONWriter; +import java.util.List; + /** * * @author Ashish (ashu.impetus@gmail.com) diff --git a/src/main/java/com/pinterest/secor/util/orc/VectorColumnFiller.java b/src/main/java/com/pinterest/secor/util/orc/VectorColumnFiller.java index e5bf55800..c277e571a 100644 --- a/src/main/java/com/pinterest/secor/util/orc/VectorColumnFiller.java +++ b/src/main/java/com/pinterest/secor/util/orc/VectorColumnFiller.java @@ -228,7 +228,7 @@ public void convert(JsonElement value, ColumnVector vect, int row) { * {@code UnionColumnConverter} is instantiated, as it is given as {@code TypeDescription} which represents an * ORC schema. Conversely, when {@code convert()} method is called, limited type information is available because * JSON supports three primitive types only: boolean, number, and string. - * + *

* The proposed solution for this issue is to register appropriate converters at the time of instantiation with * a matching {@code ColumnVector} index. Note that {@code UnionColumnVector} has child column vectors to support * each of its child type. @@ -416,10 +416,10 @@ public static JsonConverter createConverter(TypeDescription schema) { return new StructColumnConverter(schema); case LIST: return new ListColumnConverter(schema); - case MAP: - return new MapColumnConverter(schema); - case UNION: - return new UnionColumnConverter(schema); + case MAP: + return new MapColumnConverter(schema); + case UNION: + return new UnionColumnConverter(schema); default: throw new IllegalArgumentException("Unhandled type " + schema); } diff --git a/src/main/scripts/docker-entrypoint.sh b/src/main/scripts/docker-entrypoint.sh index 510c70049..2b6c886fc 100644 --- a/src/main/scripts/docker-entrypoint.sh +++ b/src/main/scripts/docker-entrypoint.sh @@ -1,7 +1,6 @@ -#!/bin/sh +#!/bin/bash set -e - SECOR_CONFIG='' if [ -z "$ZOOKEEPER_QUORUM" ]; then diff --git a/src/test/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactoryTest.java b/src/test/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactoryTest.java index d4871abc5..b0af5de2a 100644 --- a/src/test/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactoryTest.java +++ b/src/test/java/com/pinterest/secor/io/impl/JsonORCFileReaderWriterFactoryTest.java @@ -36,9 +36,9 @@ public void setUp() throws Exception { private LogFilePath getTempLogFilePath(String topic) { return new LogFilePath(Files.createTempDir().toString(), - topic, - new String[]{"part-1"}, - 0, 1, 0, ".log" + topic, + new String[]{"part-1"}, + 0, 1, 0, ".log" ); } @@ -117,39 +117,39 @@ private KeyValue[] readRecords(JsonORCFileReaderWriterFactory factory, LogFilePa @Test public void testMapOfStringToString() throws Exception { runCommonTest( - "struct>", - "string-to-string", - "{\"mappings\":{\"key1\":\"value1\",\"key2\":\"value2\"}}" + "struct>", + "string-to-string", + "{\"mappings\":{\"key1\":\"value1\",\"key2\":\"value2\"}}" ); } @Test public void testMapOfStringToInteger() throws Exception { runCommonTest( - "struct>", - "string-to-integer", - "{\"mappings\":{\"key1\":1,\"key2\":-2}}", - "{\"mappings\":{\"key3\":1523,\"key4\":3451325}}", - "{\"mappings\":{\"key5\":0,\"key6\":-8382}}" + "struct>", + "string-to-integer", + "{\"mappings\":{\"key1\":1,\"key2\":-2}}", + "{\"mappings\":{\"key3\":1523,\"key4\":3451325}}", + "{\"mappings\":{\"key5\":0,\"key6\":-8382}}" ); } @Test public void testMultipleMaps() throws Exception { runCommonTest( - "struct\\,f2:map>", - "multiple-maps", - "{\"f1\":{\"k1\":0,\"k2\":1234},\"f2\":{\"k3\":\"test\"}}" + "struct\\,f2:map>", + "multiple-maps", + "{\"f1\":{\"k1\":0,\"k2\":1234},\"f2\":{\"k3\":\"test\"}}" ); } @Test public void testJsonORCReadWriteRoundTrip() throws Exception { runCommonTest( - "struct>", - "round-trip", - "{\"firstname\":\"Jason\",\"age\":48,\"test\":{\"k1\":\"v1\",\"k2\":\"v2\"}}", - "{\"firstname\":\"Christina\",\"age\":37,\"test\":{\"k3\":\"v3\"}}" + "struct>", + "round-trip", + "{\"firstname\":\"Jason\",\"age\":48,\"test\":{\"k1\":\"v1\",\"k2\":\"v2\"}}", + "{\"firstname\":\"Christina\",\"age\":37,\"test\":{\"k3\":\"v3\"}}" ); } @@ -180,39 +180,39 @@ public void testWithLargeKeySet() throws Exception { } runCommonTest( - "struct>", - "large-key-set", - jsonObjects + "struct>", + "large-key-set", + jsonObjects ); } @Test(expected = IllegalArgumentException.class) public void testWithNonStringKeys() throws Exception { runCommonTest( - "struct>", - "non-string-keys", - "{0:{1:2,3:4}}" + "struct>", + "non-string-keys", + "{0:{1:2,3:4}}" ); } @Test public void testUnionType() throws Exception { runCommonTest( - "struct>", - "union-type", - "{\"values\":\"stringvalue\"}", - "{\"values\":1234}", - "{\"values\":null}" + "struct>", + "union-type", + "{\"values\":\"stringvalue\"}", + "{\"values\":1234}", + "{\"values\":null}" ); } @Test(expected = UnsupportedOperationException.class) public void testUnionTypeWithNonPrimitive() throws Exception { runCommonTest( - "struct>>", - "union-type-with-non-primitive", - "{\"v1\":1234}", - "{\"v1\":{\"v2\":null,\"v3\":1048576}}" + "struct>>", + "union-type-with-non-primitive", + "{\"v1\":1234}", + "{\"v1\":{\"v2\":null,\"v3\":1048576}}" ); } } \ No newline at end of file