Skip to content

Commit 57be4c9

Browse files
committed
Perform some renamings
1 parent 38800cd commit 57be4c9

File tree

7 files changed

+43
-81
lines changed

7 files changed

+43
-81
lines changed

src/main/java/io/cdap/plugin/jms/common/SchemaValidationUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public static void validateObjectMessageSchema(Schema schema, FailureCollector c
208208
* @param schema the user defined schema
209209
* @param collector the failure collector
210210
*/
211-
public static void validateBytesMessageInputSchema(Schema schema, FailureCollector collector) {
211+
public static void validateBytesMessageSchema(Schema schema, FailureCollector collector) {
212212
Schema.Type type = schema.getField(JMSMessageParts.BODY).getSchema().getType();
213213
boolean isTypeString = type.equals(Schema.Type.STRING);
214214
boolean isTypeRecord = type.equals(Schema.Type.RECORD);

src/main/java/io/cdap/plugin/jms/sink/JMSBatchSink.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*/
4040
@Plugin(type = BatchSink.PLUGIN_TYPE)
4141
@Name("JMS")
42-
@Description("JMS sink to write events to JMS")
42+
@Description("JMSSink")
4343
public class JMSBatchSink extends ReferenceBatchSink<StructuredRecord, NullWritable, StructuredRecord> {
4444

4545
private final JMSBatchSinkConfig config;

src/main/java/io/cdap/plugin/jms/sink/JMSBatchSinkConfig.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@
2424
import io.cdap.cdap.etl.api.FailureCollector;
2525
import io.cdap.plugin.jms.common.JMSConfig;
2626
import io.cdap.plugin.jms.common.JMSMessageType;
27-
import io.cdap.plugin.jms.source.JMSSourceUtils;
2827

2928
import java.io.IOException;
3029
import java.io.Serializable;
3130
import javax.annotation.Nullable;
3231

3332
/**
34-
* Holds configuration required for configuring {@link JMSSourceUtils ;} and
35-
* {@link JMSBatchSink}.
33+
* Holds the necessary configurations for the JMS source plugin
3634
*/
3735
public class JMSBatchSinkConfig extends JMSConfig implements Serializable {
3836

src/main/java/io/cdap/plugin/jms/source/JMSSourceUtils.java

-37
This file was deleted.

src/main/java/io/cdap/plugin/jms/source/JMSStreamingSource.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,19 @@
2727
import io.cdap.cdap.etl.api.streaming.StreamingSource;
2828
import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
2929
import io.cdap.plugin.common.LineageRecorder;
30+
import org.apache.spark.storage.StorageLevel;
3031
import org.apache.spark.streaming.api.java.JavaDStream;
32+
import org.apache.spark.streaming.receiver.Receiver;
3133

3234
import java.util.stream.Collectors;
3335

3436
/**
35-
* This class <code>JMSStreamingSource</code> is a plugin that allows consuming messages from a specified JMS
36-
* Queue/Topic and generate StructuredRecords out of them.
37+
* This class is a plugin that allows consuming messages from a specified JMS Queue/Topic and generate
38+
* StructuredRecords out of them.
3739
*/
3840
@Plugin(type = StreamingSource.PLUGIN_TYPE)
3941
@Name("JMS")
40-
@Description("JMS (Java Messaging Service) Source")
42+
@Description("JMSSource")
4143
public class JMSStreamingSource extends ReferenceStreamingSource<StructuredRecord> {
4244

4345
private JMSStreamingSourceConfig config;
@@ -57,7 +59,6 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
5759
@Override
5860
public void prepareRun(StreamingSourceContext context) throws Exception {
5961
Schema schema = config.getSchema();
60-
// record dataset lineage
6162
context.registerLineage(config.referenceName, schema);
6263

6364
if (schema.getFields() != null) {
@@ -72,6 +73,12 @@ public JavaDStream<StructuredRecord> getStream(StreamingContext context) throws
7273
FailureCollector collector = context.getFailureCollector();
7374
config.validate(collector);
7475
collector.getOrThrowException();
75-
return JMSSourceUtils.getJavaDStream(context, config);
76+
return getJavaDStream(context, config);
77+
}
78+
79+
private static JavaDStream<StructuredRecord> getJavaDStream(StreamingContext context,
80+
JMSStreamingSourceConfig config) {
81+
Receiver<StructuredRecord> jmsReceiver = new JMSReceiver(StorageLevel.MEMORY_AND_DISK_SER_2(), config);
82+
return context.getSparkStreamingContext().receiverStream(jmsReceiver);
7683
}
7784
}

src/main/java/io/cdap/plugin/jms/source/JMSStreamingSourceConfig.java

+27-33
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import io.cdap.plugin.jms.common.JMSMessageHeader;
2828
import io.cdap.plugin.jms.common.JMSMessageParts;
2929
import io.cdap.plugin.jms.common.JMSMessageType;
30+
import io.cdap.plugin.jms.common.SchemaValidationUtils;
3031

3132
import java.io.IOException;
3233
import java.io.Serializable;
3334
import java.util.ArrayList;
34-
import java.util.Arrays;
3535
import java.util.List;
3636
import java.util.stream.Collectors;
3737
import javax.annotation.Nullable;
@@ -113,52 +113,46 @@ public void validate(FailureCollector failureCollector) {
113113
}
114114

115115
if (!containsMacro(NAME_SCHEMA)) {
116-
117116
Schema schema = getSchema();
118117

119-
boolean otherFieldsExist = schema
120-
.getFields()
121-
.stream()
122-
.anyMatch(field -> !JMSMessageParts.getJMSMessageParts().contains(field.getName()));
123-
124-
if (otherFieldsExist) {
125-
failureCollector
126-
.addFailure("New fields detected in the output schema!",
127-
String.format("Only \"%s\", \"%s\" and \"%s\" fields are allowed.",
128-
JMSMessageParts.HEADER, JMSMessageParts.BODY, JMSMessageParts.PROPERTIES))
129-
.withConfigProperty(NAME_SCHEMA);
118+
SchemaValidationUtils.validateIfAnyNotSupportedRootFieldExists(schema, failureCollector);
119+
120+
if (getMessageHeader()) {
121+
SchemaValidationUtils.validateHeaderSchema(schema, failureCollector);
122+
}
123+
124+
if (getMessageProperties()) {
125+
SchemaValidationUtils.validatePropertiesSchema(schema, failureCollector);
130126
}
131127

132128
switch (messageType) {
133129
case JMSMessageType.TEXT:
134-
if (!schema.getField(JMSMessageParts.BODY).getSchema().getType().equals(Schema.Type.STRING)) {
135-
failureCollector
136-
.addFailure(String.format("Wrong data type for field \"%s\".", JMSMessageParts.BODY),
137-
String.format("For JMS %s type of message, \"%s\" must be of String data type.",
138-
messageType, JMSMessageParts.BODY)).withConfigProperty(NAME_SCHEMA);
139-
}
130+
SchemaValidationUtils.validateIfBodyNotInSchema(schema, failureCollector);
131+
SchemaValidationUtils.validateTextMessageSchema(schema, failureCollector);
132+
140133
break;
141134

142135
case JMSMessageType.OBJECT:
143-
if (!schema.getField(JMSMessageParts.BODY).getSchema().getType().equals(Schema.Type.ARRAY)) {
144-
failureCollector
145-
.addFailure(String.format("Wrong data type for field \"%s\".", JMSMessageParts.BODY),
146-
String.format("For JMS %s type of message, \"%s\" must be of Array[Bytes] data type.",
147-
messageType, JMSMessageParts.BODY)).withConfigProperty(NAME_SCHEMA);
148-
}
136+
SchemaValidationUtils.validateIfBodyNotInSchema(schema, failureCollector);
137+
SchemaValidationUtils.validateObjectMessageSchema(schema, failureCollector);
149138
break;
150139

151140
case JMSMessageType.BYTES:
141+
SchemaValidationUtils.validateIfBodyNotInSchema(schema, failureCollector);
142+
SchemaValidationUtils.validateBytesMessageSchema(schema, failureCollector);
143+
144+
break;
145+
152146
case JMSMessageType.MAP:
153-
case JMSMessageType.MESSAGE:
154-
if (!Arrays.asList(Schema.Type.STRING, Schema.Type.RECORD)
155-
.contains(schema.getField(JMSMessageParts.BODY).getSchema().getType())) {
156-
failureCollector
157-
.addFailure(String.format("Wrong data type for field \"%s\"", JMSMessageParts.BODY),
158-
String.format("For JMS %s type of message, \"%s\" must be of String or Record data type.",
159-
messageType, JMSMessageParts.BODY)).withConfigProperty(NAME_SCHEMA);
160-
}
147+
SchemaValidationUtils.validateIfBodyNotInSchema(schema, failureCollector);
148+
SchemaValidationUtils.validateMapMessageSchema(schema, failureCollector);
149+
161150
break;
151+
152+
case JMSMessageType.MESSAGE:
153+
SchemaValidationUtils.validateMessageSchema(schema, failureCollector);
154+
155+
162156
}
163157
}
164158
}

src/test/java/io/cdap/plugin/jms/common/SchemaValidationUtilsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void validateByteMessageSchema_WithNonStringOrRecordBodyRootField_ShouldT
172172
Schema schema = Schema
173173
.recordOf("record", Schema.Field.of(JMSMessageParts.BODY, Schema.of(Schema.Type.INT)));
174174

175-
SchemaValidationUtils.validateBytesMessageInputSchema(schema, null);
175+
SchemaValidationUtils.validateBytesMessageSchema(schema, null);
176176
}
177177

178178
@Test

0 commit comments

Comments
 (0)