Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplication file name for batch message #1101

Closed
wants to merge 1 commit into from

Conversation

shibd
Copy link
Member

@shibd shibd commented Nov 4, 2024

Motivation

If sink batch message, will get an error for azure blob storage:

2024-11-04T01:43:57,072+0000 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob abhijith/testing-eh-kaka-sync/unbuffered-telemetryfeed-partition-28/2024-10-30/799132352513.json
com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlobAlreadyExists</Code><Message>The specified blob already exists.
RequestId:4506238d-001e-00df-085b-2eae40000000
Time:2024-11-04T01:43:57.0681430Z</Message></Error>"
	at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732) ~[?:?]
	at com.azure.core.implementation.MethodHandleReflectiveInvoker.invokeWithArguments(MethodHandleReflectiveInvoker.java:39) ~[azure-core-1.47.0.jar:1.47.0]
	at com.azure.core.implementation.http.rest.ResponseExceptionConstructorCache.invoke(ResponseExceptionConstructorCache.java:53) ~[azure-core-1.47.0.jar:1.47.0]
	at com.azure.core.implementation.http.rest.RestProxyBase.instantiateUnexpectedException(RestProxyBase.java:411) ~[azure-core-1.47.0.jar:1.47.0]
	at com.azure.core.implementation.http.rest.AsyncRestProxy.lambda$ensureExpectedStatus$1(AsyncRestProxy.java:132) ~[azure-core-1.47.0.jar:1.47.0]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172) ~[reactor-core-3.4.34.jar:3.4.34]

The root cause is here used getSequence

return record.getRecordSequence()
.orElseThrow(() -> new RuntimeException("found empty recordSequence"));

But, in pulsar, just use LedgerId + EntryId to gen sequence

https://github.com/apache/pulsar/blob/bbc62245c5ddba1de4b1e7cee4ab49334bc36277/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java#L289-L299

Modifications

  • Reimpl this logic on this connector to quick fix: if a message is a batch, will add batchIndex.

Verifying this change

  • Add AbstractPartitionerTest to cover it, and verify on my localhost

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@shibd shibd requested a review from a team as a code owner November 4, 2024 07:32
@github-actions github-actions bot added the no-need-doc This pr does not need any document label Nov 4, 2024
@shibd shibd requested a review from RobertIndie November 4, 2024 07:32
@shibd shibd closed this Nov 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
no-need-doc This pr does not need any document
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant