Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for collections #21

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

Lorak-mmk
Copy link
Contributor

Based on #12 and includes changes from it - so review should be done on per-commit base.

This PR adds simple support for non-frozen collections.

There is new config option scylla.collections.mode, currently only possible value is "simple" - it selects format for non-frozen collections (in the future we could add preimage etc).

Simple mode collections format is described in README.md (along with frozen collections format). Just to give very brief description: non-frozen collections are represented as structs with 2 fields, "mode" and "elements", "mode" marks type of operation (add elements, remove elements, overwrite collection), "elements" are actual elements used in operation.
For Set, "elements" is simply a Set.
List is a map with timeuuid key type. When removing elements, values are null.
Map is simply a Map. When removing elements, values are null.
UDT is the most complicated. It is represented as struct, but each field is a Cell, and semantics are the same as with column's "Cell" - null means no change, non-null with null value field means removal, non-null with non-null value field means new value.

I didn't yet test it with Avro.

avelanarius and others added 6 commits September 9, 2021 18:35
Add support for including frozen lists in generated changes. Made
necessary changes to support nested data types.
Add support for including frozen sets in generated changes.
Add support for including frozen maps in generated changes.
Add support for including tuples in generated changes. For a tuple,
a Kafka Connect struct is created with "tuple_member_*" for each member
of a tuple (as they can have different data types inside).
Add support for including frozen UDTs in generated changes.
@Lorak-mmk Lorak-mmk changed the title Non frozen collections Support for non-frozen collections Feb 3, 2022
@Lorak-mmk
Copy link
Contributor Author

Lorak-mmk commented Feb 8, 2022

Now that I think of it, maybe it's redundant to have "REMOVE" mode and removals should be represented as setting to null (as is currently the case for UDT)? Then we would have 2 modes, let's say "UPDATE" and "OVERWRITE", the difference between them would be whether the collection is cleared before operation.
@avelanarius

@avelanarius
Copy link

Now that I think of it, maybe it's redundant to have "REMOVE" mode and removals should be represented as setting to null (as is currently the case for UDT)? Then we would have 2 modes, let's say "UPDATE" and "OVERWRITE", the difference between them would be whether the collection is cleared before operation. @avelanarius

I don't see how it would work for sets?

@hartmut-co-uk
Copy link

Opinion:
Hi, for me it would be great if we could also have to option (configurable?) to just emit FROZEN collections 'as-is' (...always the full latest value).
=> so without the extra ELEMENTS_VALUE; REMOVED_ELEMENTS_VALUE; MODE_VALUE.

That would make the output record look cleaner and more like if you'd query Scylla directly.

@Lorak-mmk Lorak-mmk force-pushed the non-frozen-collections branch from 034d0d5 to a4c715a Compare February 18, 2022 17:52
@Lorak-mmk
Copy link
Contributor Author

I pushed new version, with a bit different representation.
It had to be changed, because previous one didn't work well with queries performing more than one modification on given collection, e.g.: UPDATE ks.t_list SET v = v - [6, 7], v = v + [4, 5] WHERE pk = 1;

Now, there are only 2 modes: OVERWRITE and MODIFY, and collection struct always has 2 fields: mode and elements.
For list/maps, elements is a map, element is added/overwritten if value is not null, removed otherwise.
For sets, elements is a map, with boolean value - true means value was added to set, false means it was removed.
UDTs didn't change.

I also renamed SIMPLE mode to DELTA, to better reflect what it actually is.

@avelanarius @haaawk

Opinion: Hi, for me it would be great if we could also have to option (configurable?) to just emit FROZEN collections 'as-is' (...always the full latest value). => so without the extra ELEMENTS_VALUE; REMOVED_ELEMENTS_VALUE; MODE_VALUE.

That would make the output record look cleaner and more like if you'd query Scylla directly.

Yes, that would of course be better, but is harder (as it requires preimage/postimage usage), and will be added in the future - that's why I added config option to select mode for non-frozen collections.

@Lorak-mmk Lorak-mmk changed the title Support for non-frozen collections Support for collections Jan 11, 2024
@Bouncheck Bouncheck self-requested a review December 2, 2024 13:19
@Bouncheck Bouncheck self-assigned this Dec 2, 2024
@Bouncheck
Copy link
Collaborator

I've tried rebasing (no significant changes were made) and running this change and there are two issues:

  1. PARTITION_DELETE does not work, I'm getting the following stacktrace:
org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {window_end=ace3e75f-b2b0-11ef-7f7f-7f7f7f7f7f7f, change_id_time=a925ed8a-b2b0-11ef-082d-47b54b69ae2e, change_id_stream_id=0xd0b900000000000033dfba5b58000571, window_start=9b024460-b2b0-11ef-8080-808080808080}
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:225)
	at com.scylladb.cdc.debezium.connector.ScyllaChangesConsumer.consume(ScyllaChangesConsumer.java:81)
	at com.scylladb.cdc.model.worker.Consumer$2.consume(Consumer.java:34)
	at com.scylladb.cdc.model.worker.TaskAction$ConsumeChangeTaskAction.run(TaskAction.java:163)
	at com.scylladb.cdc.model.worker.Worker.lambda$makeCallable$4(Worker.java:115)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at com.scylladb.cdc.debezium.connector.ScyllaChangeRecordEmitter.translateNonFrozenCollectionToKafka(ScyllaChangeRecordEmitter.java:246)
	at com.scylladb.cdc.debezium.connector.ScyllaChangeRecordEmitter.fillStructWithChange(ScyllaChangeRecordEmitter.java:151)
	at com.scylladb.cdc.debezium.connector.ScyllaChangeRecordEmitter.emitDeleteRecord(ScyllaChangeRecordEmitter.java:128)
	at com.scylladb.cdc.debezium.connector.ScyllaChangeRecordEmitter.emitDeleteRecord(ScyllaChangeRecordEmitter.java:31)
	at io.debezium.pipeline.AbstractChangeRecordEmitter.emitChangeRecords(AbstractChangeRecordEmitter.java:44)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:198)
	... 9 more

Since it's a rebased version here's relevant section of ScyllaChangeRecordEmitter for context:

                Map<String, Field> elementsMap = elementsCell.getUDT();
                assert elementsMap instanceof LinkedHashMap;

                Schema udtSchema = innerSchema.field(ScyllaSchema.ELEMENTS_VALUE).schema();
                Struct udtStruct = new Struct(udtSchema);
                Short index = -1;
                //line 246 is below
                for (Map.Entry<String, Field> element : elementsMap.entrySet()) {
                    index++;
                    if ((!element.getValue().isNull()) || deletedKeys.contains(index)) {
                        hasModified = true;
                        Schema fieldCellSchema = udtSchema.field(element.getKey()).schema();
                        Struct fieldCell = new Struct (fieldCellSchema);  

@Lorak-mmk i see that there is an assert before that line. It's likely not firing because connectors are run without -ea flag. Do you remember if elementsMap should be guaranteed to be non-null or if can that be handled here somehow?

  1. Avro does not seem to work, I have yet to figure out what exactly is wrong and if it's relevant to all kafka platforms or just confluent because of schema registry:
ERROR [ScyllaConnectorConnector_avro|task-0] WorkerSourceTask{id=ScyllaConnectorConnector_avro-0} Task threw an uncaught and unreco
verable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:207)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:334)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:360)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:273)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic avro2.my_keyspace.cdc_enabled_table_5 :
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$4(WorkerSourceTask.java:334)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
        ... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:166)
        at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
        ... 15 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.MapEntry
        at org.apache.avro.Schema$Names.put(Schema.java:1550)
        at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:813)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:975)
        at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1137)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1003)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:987)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1242)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1003)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:987)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1242)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1003)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:987)
        at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1242)
        at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:1003)
        at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:987)
        at org.apache.avro.Schema.toString(Schema.java:426)
        at org.apache.avro.Schema.toString(Schema.java:417)
        at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:157)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:274)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:381)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:354)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:125)
        ... 17 more

I'll try to make it work and give next update if i find anything significant

@Bouncheck
Copy link
Collaborator

Also in Fix: allow null sets/maps/lists I don't see changes related to sets. Were they already ok and did not require fixing?

@Lorak-mmk
Copy link
Contributor Author

I did not work with kafka for a fea years now. Could you write a list of step to set this up and reproduce the issue so I can debug it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants