diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableManager.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableManager.scala index 23ac83225c9..6ce623b2dc0 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableManager.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/TableManager.scala @@ -23,6 +23,7 @@ import org.locationtech.geomesa.utils.zk.ZookeeperLocking import java.io.Closeable import java.util.concurrent.TimeUnit +import scala.util.control.NonFatal /** * Manages table creation/deletion @@ -154,24 +155,40 @@ object TableManager { */ def ensureTableExists(table: String, timeType: TimeType): Boolean = { var created = false - tableCache.get(table, _ => { - withLock(tablePath(table), timeoutMillis, { - val tableOps = client.tableOperations() - if (!tableOps.exists(table)) { - try { - tableOps.create(table, new NewTableConfiguration().setTimeType(timeType)) - created = true - } catch { - case _: TableExistsException => onTableExists(table) - } - val start = System.currentTimeMillis() - while (!tableOps.exists(table) && System.currentTimeMillis() - start < timeoutMillis) { - Thread.sleep(10) + var err: Throwable = null + + // noinspection ScalaUnusedSymbol + def load(ignoredKey: String): java.lang.Boolean = { + try { + withLock(tablePath(table), timeoutMillis, { + val tableOps = client.tableOperations() + if (!tableOps.exists(table)) { + try { + tableOps.create(table, new NewTableConfiguration().setTimeType(timeType)) + created = true + } catch { + case _: TableExistsException => onTableExists(table) + } + val start = System.currentTimeMillis() + while (!tableOps.exists(table)) { + if (System.currentTimeMillis() - start > timeoutMillis) { + throw new RuntimeException( + s"Tried to create table '$table', but it has not been created after timeout of ${timeoutMillis}ms") + } + Thread.sleep(10) + } } - } - }) - java.lang.Boolean.FALSE - }) + }) + java.lang.Boolean.FALSE + } catch { + case NonFatal(e) => err = e; throw e + } + } + + tableCache.get(table, load) + if (err != null) { + throw err + } created } @@ -183,21 +200,37 @@ object TableManager { */ def ensureNamespaceExists(namespace: String): Unit = { val ns = namespace.takeWhile(_ != '.') - nsCache.get(ns, _ => { - withLock(nsPath(ns), timeoutMillis, { - val nsOps = client.namespaceOperations - if (!nsOps.exists(ns)) { - try { nsOps.create(ns) } catch { - case _: NamespaceExistsException => onNamespaceExists(ns) - } - val start = System.currentTimeMillis() - while (!nsOps.exists(ns) && System.currentTimeMillis() - start < timeoutMillis) { - Thread.sleep(10) + var err: Throwable = null + + // noinspection ScalaUnusedSymbol + def load(ignoredKey: String): java.lang.Boolean = { + try { + withLock(nsPath(ns), timeoutMillis, { + val nsOps = client.namespaceOperations + if (!nsOps.exists(ns)) { + try { nsOps.create(ns) } catch { + case _: NamespaceExistsException => onNamespaceExists(ns) + } + val start = System.currentTimeMillis() + while (!nsOps.exists(ns)) { + if (System.currentTimeMillis() - start > timeoutMillis) { + throw new RuntimeException( + s"Tried to create namespace '$ns', but it has not been created after timeout of ${timeoutMillis}ms") + } + Thread.sleep(10) + } } - } - }) - java.lang.Boolean.FALSE - }) + }) + java.lang.Boolean.FALSE + } catch { + case NonFatal(e) => err = e; throw e + } + } + + nsCache.get(ns, load) + if (err != null) { + throw err + } } /**