Skip to content

Commit

Permalink
GEOMESA-3442 Accumulo - throw errors if tables are not created before…
Browse files Browse the repository at this point in the history
… timeout (#3269)
  • Loading branch information
elahrvivaz authored Jan 31, 2025
1 parent 2d14922 commit fe3985f
Showing 1 changed file with 64 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.locationtech.geomesa.utils.zk.ZookeeperLocking

import java.io.Closeable
import java.util.concurrent.TimeUnit
import scala.util.control.NonFatal

/**
* Manages table creation/deletion
Expand Down Expand Up @@ -154,24 +155,40 @@ object TableManager {
*/
def ensureTableExists(table: String, timeType: TimeType): Boolean = {
var created = false
tableCache.get(table, _ => {
withLock(tablePath(table), timeoutMillis, {
val tableOps = client.tableOperations()
if (!tableOps.exists(table)) {
try {
tableOps.create(table, new NewTableConfiguration().setTimeType(timeType))
created = true
} catch {
case _: TableExistsException => onTableExists(table)
}
val start = System.currentTimeMillis()
while (!tableOps.exists(table) && System.currentTimeMillis() - start < timeoutMillis) {
Thread.sleep(10)
var err: Throwable = null

// noinspection ScalaUnusedSymbol
def load(ignoredKey: String): java.lang.Boolean = {
try {
withLock(tablePath(table), timeoutMillis, {
val tableOps = client.tableOperations()
if (!tableOps.exists(table)) {
try {
tableOps.create(table, new NewTableConfiguration().setTimeType(timeType))
created = true
} catch {
case _: TableExistsException => onTableExists(table)
}
val start = System.currentTimeMillis()
while (!tableOps.exists(table)) {
if (System.currentTimeMillis() - start > timeoutMillis) {
throw new RuntimeException(
s"Tried to create table '$table', but it has not been created after timeout of ${timeoutMillis}ms")
}
Thread.sleep(10)
}
}
}
})
java.lang.Boolean.FALSE
})
})
java.lang.Boolean.FALSE
} catch {
case NonFatal(e) => err = e; throw e
}
}

tableCache.get(table, load)
if (err != null) {
throw err
}
created
}

Expand All @@ -183,21 +200,37 @@ object TableManager {
*/
def ensureNamespaceExists(namespace: String): Unit = {
val ns = namespace.takeWhile(_ != '.')
nsCache.get(ns, _ => {
withLock(nsPath(ns), timeoutMillis, {
val nsOps = client.namespaceOperations
if (!nsOps.exists(ns)) {
try { nsOps.create(ns) } catch {
case _: NamespaceExistsException => onNamespaceExists(ns)
}
val start = System.currentTimeMillis()
while (!nsOps.exists(ns) && System.currentTimeMillis() - start < timeoutMillis) {
Thread.sleep(10)
var err: Throwable = null

// noinspection ScalaUnusedSymbol
def load(ignoredKey: String): java.lang.Boolean = {
try {
withLock(nsPath(ns), timeoutMillis, {
val nsOps = client.namespaceOperations
if (!nsOps.exists(ns)) {
try { nsOps.create(ns) } catch {
case _: NamespaceExistsException => onNamespaceExists(ns)
}
val start = System.currentTimeMillis()
while (!nsOps.exists(ns)) {
if (System.currentTimeMillis() - start > timeoutMillis) {
throw new RuntimeException(
s"Tried to create namespace '$ns', but it has not been created after timeout of ${timeoutMillis}ms")
}
Thread.sleep(10)
}
}
}
})
java.lang.Boolean.FALSE
})
})
java.lang.Boolean.FALSE
} catch {
case NonFatal(e) => err = e; throw e
}
}

nsCache.get(ns, load)
if (err != null) {
throw err
}
}

/**
Expand Down

0 comments on commit fe3985f

Please sign in to comment.