Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3439 Converters - fix thread-safety of md5 function #3265

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ class IdFunctionFactory extends TransformerFunctionFactory with LazyLogging {
}

private val md5: TransformerFunction = new NamedTransformerFunction(Seq("md5"), pure = true) {
private val hasher = MessageDigest.getInstance("MD5")
private val hashers = new ThreadLocal[MessageDigest]() {
override def initialValue(): MessageDigest = MessageDigest.getInstance("MD5")
}
override def apply(args: Array[AnyRef]): AnyRef = {
val bytes = args(0) match {
case s: String => s.getBytes(StandardCharsets.UTF_8)
case b: Array[Byte] => b
case a => throw new IllegalArgumentException(s"Expected String or byte[] but got: $a")
}
ByteArrays.toHex(hasher.digest(bytes))
ByteArrays.toHex(hashers.get.digest(bytes))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/***********************************************************************
* Copyright (c) 2013-2025 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.convert2.transforms

import org.locationtech.geomesa.utils.concurrent.CachedThreadPool
import org.specs2.matcher.MatchResult
import org.specs2.mutable.Specification

import java.nio.charset.StandardCharsets
import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, CopyOnWriteArrayList, TimeUnit}
import scala.util.Random

class IdFunctionFactoryTest extends Specification {

"IdFunctionFactoryTest" should {
"generate hashes in a thread-safe way" in {
testHash("md5")
testHash("murmurHash3")
testHash("murmur3_32")
testHash("murmur3_64")
}
}

def testHash(alg: String): MatchResult[_] = {
val exp = Expression(s"$alg($$0)")
val results = Array.fill(3)(Collections.newSetFromMap(new ConcurrentHashMap[AnyRef, java.lang.Boolean]()))
val exceptions = new CopyOnWriteArrayList[Throwable]()
val runnables = Array("foo", "bar", "blubaz").map(_.getBytes(StandardCharsets.UTF_8)).zipWithIndex.map { case (input, i) =>
new Runnable() {
override def run(): Unit = {
try { results(i).add(exp.apply(Array(input))) } catch {
case e: Throwable => exceptions.add(e)
}
}
}
}
// baseline results
runnables.foreach(_.run())

val r = new Random(-1)
val pool = new CachedThreadPool(10)
try {
var i = 0
while (i < 1000) {
pool.submit(runnables(r.nextInt(3)))
i += 1
}
} finally {
pool.shutdown()
}
pool.awaitTermination(1, TimeUnit.SECONDS)
foreach(results)(_ must haveSize(1))
exceptions must haveSize(0)
}
}
Loading