Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JsonSchemaConverter does not fully support draft-07 as advertised in readme #368

Open
javierlarota opened this issue Feb 17, 2025 · 0 comments

Comments

@javierlarota
Copy link

javierlarota commented Feb 17, 2025

aws-glue-schema-registry JsonSchemaConverter fails when the draft-07 Json schema contains if-then-else. Everything works perfectly using Kafka Schema Registry, but I would like to migrate to AWS Glue Schema Registry.

[2025-02-17 21:12:50,667] ERROR Error encountered in task Test-0. Executing stage 'VALUE_CONVERTER' with class 'com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter'. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Unsupported schema type org.everit.json.schema.ConditionalSchema
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:107)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:63)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.buildNonOptionalUnionSchema(JsonSchemaToConnectSchemaConverter.java:138)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:101)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:63)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.buildNonOptionalUnionSchema(JsonSchemaToConnectSchemaConverter.java:138)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:101)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaToConnectSchemaConverter.toConnectSchema(JsonSchemaToConnectSchemaConverter.java:63)
        at com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:201)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:540)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:207)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:179)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:540)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:343)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

JSON Payload

  {
    "accountId": "123456",
    "timeStampCreated": 334455667788,
    "accountAlertType": "CheckingBasic_low_balance",
    "isActive": true,
    "helpUrl": "https://aaaaa.com",
    "accountType": "CheckingBasic",
    "accountCategory": "Checking"
  }

JSONSchema

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "type": "object",
  "additionalProperties": false,
  "properties": {
    "accountId": {
      "type": "string",
      "pattern": "^[0-9]+$"
    },
    "timeStampCreated": {
      "type": "integer",
      "format": "int64",
      "minimum": 0,
      "maximum": 253402300799999
    },
    "accountAlertType": {
      "type": "string"
    },
    "isActive": {
      "type": "boolean"
    },
    "helpUrl": {
      "type": "string",
      "pattern": "^(https?:\\/\\/)(?!.*(select|[Ss][Ee][Ll][Ee][Cc][Tt]|insert|[Ii][Nn][Ss][Ee][Rr][Tt]|update|[Uu][Pp][Dd][Aa][Tt][Ee]|delete|[Dd][Ee][Ll][Ee][Tt][Ee]|drop|[Dd][Rr][Oo][Pp]|alter|[Aa][Ll][Tt][Ee][Rr]|create|[Cc][Rr][Ee][Aa][Tt][Ee]|truncate|[Tt][Rr][Uu][Nn][Cc][Aa][Tt][Ee]|exec|[Ee][Xx][Ee][Cc]|declare|[Dd][Ee][Cc][Ll][Aa][Rr][Ee]|--|;|<script|<[Ss][Cc][Rr][Ii][Pp][Tt]|<.*?[Jj][Aa][Vv][Aa][Ss][Cc][Rr][Ii][Pp][Tt]:|<.*?\\s+[Oo][Nn]|<.*?\\s+[Ss][Tt][Yy][Ll][Ee]=.*?[Ee][Xx][Pp][Rr][Ee][Ss][Ss][Ii][Oo][Nn]\\(.*?\\))).*$",
      "minLength": 1,
      "maxLength": 500
    },
    "accountType": {
      "type": "string",
      "enum": [
        "CheckingBasic",
        "SavingBasic",
        "SavingPlus"
      ]
    },
    "accountCategory": {
      "type": "string",
      "enum": [
        "Checking",
        "Saving"
      ]
    }
  },
  "required": [
    "accountId",
    "timeStampCreated",
    "accountAlertType",
    "isActive",
    "helpUrl",
    "accountType",
    "accountCategory"
  ],
  "allOf": [{
      "if": {
        "properties": {
          "accountCategory": {
            "const": "Saving"
          }
        }
      },
      "then": {
        "properties": {
          "accountType": {
            "enum": [
              "SavingBasic",
              "SavingPlus"
            ]
          }
        }
      }
    }, {
      "if": {
        "properties": {
          "accountCategory": {
            "const": "Checking"
          }
        }
      },
      "then": {
        "properties": {
          "accountType": {
            "enum": [
              "CheckingBasic"
            ]
          }
        }
      }
    }, {
      "if": {
        "properties": {
          "accountType": {
            "const": "CheckingBasic"
          }
        }
      },
      "then": {
        "properties": {
          "accountAlertType": {
            "enum": [
              "CheckingBasic_low_balance",
              "CheckingBasic_suspicious_activity"
            ]
          }
        }
      }
    }, {
      "if": {
        "properties": {
          "accountType": {
            "const": "SavingPlus"
          }
        }
      },
      "then": {
        "properties": {
          "accountAlertType": {
            "enum": [
              "SavingPlus_low_balance"
            ]
          }
        }
      }
    }, {
      "if": {
        "properties": {
          "accountType": {
            "const": "SavingBasic"
          }
        }
      },
      "then": {
        "properties": {
          "accountAlertType": {
            "enum": [
              "accountAlertType_not_defined"
            ]
          }
        }
      }
    }
  ]
}

KafkaConnect

  kafka-connect:
    hostname: kafka-connect
    container_name: kafka-connect
    image: confluentinc/cp-kafka-connect:7.8.0
    depends_on:
      - kafka
    environment:
      AWS_ACCESS_KEY_ID: ASIAWW***********
      AWS_SECRET_ACCESS_KEY: "uyKfHrscEiu**************************"
      AWS_SESSION_TOKEN: "IQoJb3JpZ2luX2VjEFUaCXVzLWVhc3Qt*********************X539cQ="
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "kafka-connect-group"
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter   
      CONNECT_VALUE_CONVERTER: com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_PLUGIN_PATH: "/usr/share/java"
    ports:
      - "8083:8083"    
    volumes:
      - ./jars/kafkaConnect:/usr/share/java/kafka-connect-jdbc
      - ./jars/awsGlueRegistry:/usr/share/java/aws-glue-schema-registry

jdbc-sink-connector

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "Alerts",
    "connection.url": "jdbc:postgresql://localhost:5421/test",
    "connection.user": "myuser",
    "connection.password": "*********************",
    "insert.mode": "upsert",
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.field": "timeStampCreated",
    "errors.tolerance": "all",
    "pk.mode": "record_value",
    "pk.fields": "accountAlertType,accountId",
    "table.name.format": "public.accountAlerts",
    "connect.task.restart.enable": "true",
    "connect.task.restart.backoff.ms": "10000",
    "errors.log.enable": "true",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.JsonSchemaConverter",
    "value.converter.schemas.enable": "false",
    "value.converter.dataFormat": "JSON",
    "value.converter.region": "us-east-2",
    "value.converter.registry.name": "alerts-poc",
    "value.converter.schemaName": "Alerts-value",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enable": "false",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter.schemas.enable": "false"
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant