From 7dc926338820823189b4111ea0d67bc7f7064ea2 Mon Sep 17 00:00:00 2001 From: ZzzCrazyPig <18825111236@163.com> Date: Sat, 16 Dec 2023 18:32:18 +0800 Subject: [PATCH] to #127 Supported customizeEventDeserializer --- .../deserialization/EventDeserializer.java | 20 +++++- ...ansactionPayloadEventDataDeserializer.java | 25 ++++++- ...ctionPayloadEventDataDeserializerTest.java | 66 ++++++++++++++++++- 3 files changed, 106 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java index 81dea256..be378ce7 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java @@ -130,7 +130,25 @@ private void registerDefaultEventDataDeserializers() { eventDataDeserializers.put(EventType.MARIADB_GTID_LIST, new MariadbGtidListEventDataDeserializer()); eventDataDeserializers.put(EventType.TRANSACTION_PAYLOAD, - new TransactionPayloadEventDataDeserializer()); + new TransactionPayloadEventDataDeserializer().customizeEventDeserializerSupplier(new TransactionPayloadEventDataDeserializer.Supplier() { + @Override + public EventDeserializer get() { + EventDeserializer eventDeserializer = new EventDeserializer( + eventHeaderDeserializer, + defaultEventDataDeserializer, + eventDataDeserializers, + tableMapEventByTableId + ); + + if (!compatibilitySet.isEmpty()) { + CompatibilityMode[] compatibilityModeSettings = new CompatibilityMode[compatibilitySet.size()]; + compatibilitySet.toArray(compatibilityModeSettings); + eventDeserializer.setCompatibilityMode(compatibilityModeSettings[0], compatibilityModeSettings); + } + + return eventDeserializer; + } + })); } public void setEventDataDeserializer(EventType eventType, EventDataDeserializer eventDataDeserializer) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java index a8e84876..d1d6e12e 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializer.java @@ -34,6 +34,18 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2; public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3; + private Supplier eventDeserializerSupplier = new Supplier() { + @Override + public EventDeserializer get() { + return new EventDeserializer(); + } + }; + + public TransactionPayloadEventDataDeserializer customizeEventDeserializerSupplier(Supplier supplier) { + this.eventDeserializerSupplier = supplier; + return this; + } + @Override public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException { TransactionPayloadEventData eventData = new TransactionPayloadEventData(); @@ -86,7 +98,7 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) // Read and store events from decompressed byte array into input stream ArrayList decompressedEvents = new ArrayList<>(); - EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer(); + EventDeserializer transactionPayloadEventDeserializer = obtainEventDeserializer(); ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(dst); Event internalEvent = transactionPayloadEventDeserializer.nextEvent(destinationInputStream); @@ -99,4 +111,15 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) return eventData; } + + protected EventDeserializer obtainEventDeserializer() { + return eventDeserializerSupplier.get(); + } + + public interface Supplier { + + V get(); + + } + } diff --git a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java index a3b8e76e..e834a4aa 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/event/deserialization/TransactionPayloadEventDataDeserializerTest.java @@ -15,15 +15,15 @@ */ package com.github.shyiko.mysql.binlog.event.deserialization; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData; -import com.github.shyiko.mysql.binlog.event.XAPrepareEventData; +import com.github.shyiko.mysql.binlog.event.*; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import org.testng.annotations.Test; import java.io.IOException; +import java.io.Serializable; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; /** * @author Somesh Malviya @@ -82,6 +82,8 @@ public class TransactionPayloadEventDataDeserializerTest { .append("]}") .toString(); + private static final byte[] UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY = new byte[] {1, 0, 0, 0}; + @Test public void deserialize() throws IOException { TransactionPayloadEventDataDeserializer deserializer = new TransactionPayloadEventDataDeserializer(); @@ -97,4 +99,62 @@ public void deserialize() throws IOException { assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType()); assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString()); } + + @Test + public void deserializeUsingEventDeserializer() throws IOException { + + ByteArrayInputStream dataStream = new ByteArrayInputStream(DATA); + + // Mock create target TransactionPayloadEventData DATA event header + final EventHeaderV4 eventHeader = new EventHeaderV4(); + eventHeader.setEventType(EventType.TRANSACTION_PAYLOAD); + eventHeader.setEventLength(DATA.length + 19L); + eventHeader.setTimestamp(1646406641000L); + eventHeader.setServerId(223344); + + + EventHeaderDeserializer eventHeaderDeserializer = new EventHeaderDeserializer() { + + private long count = 0L; + + private EventHeaderDeserializer defaultEventHeaderDeserializer = new EventHeaderV4Deserializer(); + + @Override + public EventHeader deserialize(ByteArrayInputStream inputStream) throws IOException { + if (count > 0) { + // uncompressed event header deserialize + return defaultEventHeaderDeserializer.deserialize(inputStream); + } + count++; + // we need to return target TransactionPayloadEventData DATA event header we had mocked + return eventHeader; + } + }; + + EventDeserializer eventDeserializer = new EventDeserializer(eventHeaderDeserializer, new NullEventDataDeserializer()); + eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.INTEGER_AS_BYTE_ARRAY); + + Event event = eventDeserializer.nextEvent(dataStream); + + assertTrue(event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD); + assertTrue(event.getData() instanceof TransactionPayloadEventData); + + TransactionPayloadEventData transactionPayloadEventData = event.getData(); + assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType()); + assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize()); + assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize()); + assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size()); + assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType()); + assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType()); + assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType()); + assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType()); +// assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString()); + assertTrue(transactionPayloadEventData.getUncompressedEvents().get(2).getData() instanceof UpdateRowsEventData); + + UpdateRowsEventData updateRowsEventData = transactionPayloadEventData.getUncompressedEvents().get(2).getData(); + assertEquals(1, updateRowsEventData.getRows().size()); + Serializable[] updateBefore = updateRowsEventData.getRows().get(0).getKey(); + assertEquals(UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY, updateBefore[0]); + } + }