Skip to content

Commit a79923c

Browse files
authored
Merge pull request #43 from Hub-of-all-Things/v2.6.1
V2.6.1 all checks passed
2 parents 2bea135 + 920aa4f commit a79923c

File tree

59 files changed

+2081
-479
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2081
-479
lines changed

hat/app/org/hatdex/hat/api/controllers/Applications.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ class Applications @Inject() (
7272
}
7373

7474
def applicationStatus(id: String): Action[AnyContent] = SecuredAction(ContainsApplicationRole(Owner(), ApplicationManage(id)) || WithRole(Owner())).async { implicit request =>
75-
val bustCache = request.headers.get("Cache-Control").exists(_ == "no-cache")
75+
val bustCache = request.headers.get("Cache-Control").contains("no-cache")
76+
logger.info(s"Getting app $id status (bust cache: $bustCache)")
7677
applicationsService.applicationStatus(id, bustCache).map { maybeStatus
7778
maybeStatus map { status
7879
Ok(Json.toJson(status))

hat/app/org/hatdex/hat/api/controllers/RichData.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,15 @@ class RichData @Inject() (
8585
val dataEndpoint = s"$namespace/$endpoint"
8686
val response = request.body match {
8787
case array: JsArray =>
88-
val values = array.value.map(EndpointData(dataEndpoint, None, _, None))
88+
// TODO: extract unique ID and timestamp
89+
val values = array.value.map(EndpointData(dataEndpoint, None, None, None, _, None))
8990
dataService.saveData(request.identity.userId, values, skipErrors.getOrElse(false))
9091
.andThen(dataEventDispatcher.dispatchEventDataCreated(s"saved batch for $dataEndpoint"))
9192
.map(saved => Created(Json.toJson(saved)))
9293

9394
case value: JsValue =>
94-
val values = Seq(EndpointData(dataEndpoint, None, value, None))
95+
// TODO: extract unique ID and timestamp
96+
val values = Seq(EndpointData(dataEndpoint, None, None, None, value, None))
9597
dataService.saveData(request.identity.userId, values)
9698
.andThen(dataEventDispatcher.dispatchEventDataCreated(s"saved data for $dataEndpoint"))
9799
.map(saved => Created(Json.toJson(saved.head)))

hat/app/org/hatdex/hat/api/service/MigrationService.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
package org.hatdex.hat.api.service
2626

2727
import java.util.UUID
28-
import javax.inject.Inject
2928

3029
import akka.NotUsed
3130
import akka.stream.Materializer
3231
import akka.stream.scaladsl.{ Flow, Keep, RunnableGraph, Sink, Source }
3332
import com.mohiva.play.silhouette.api.actions.SecuredRequest
33+
import javax.inject.Inject
3434
import org.hatdex.hat.api.json.HatJsonFormats
3535
import org.hatdex.hat.api.models.{ ApiDataRecord, _ }
3636
import org.hatdex.hat.api.service.monitoring.HatDataEventDispatcher
@@ -84,7 +84,7 @@ class MigrationService @Inject() (
8484
record.tables.map { tables =>
8585
val savedRecords = tables.map { table =>
8686
convertRecordJson(record, includeTimestamp = true, Some(table.name)) map { data =>
87-
richDataService.saveData(userId, Seq(EndpointData(s"${table.source}/${table.name}", None, data, None)))
87+
richDataService.saveData(userId, Seq(EndpointData(s"${table.source}/${table.name}", None, None, None, data, None)))
8888
.andThen(dataEventDispatcher.dispatchEventDataCreated(s"migrated record for ${table.source}/${table.name}"))
8989
.map(_ => 1)
9090
} getOrElse Future.successful(0)
@@ -116,7 +116,7 @@ class MigrationService @Inject() (
116116
} via {
117117
Flow[JsResult[JsObject]].map(_.get) // Unwrap Json Objects
118118
} via Flow[JsObject].mapAsync(parallelMigrations) { oldJson =>
119-
richDataService.saveData(userId, Seq(EndpointData(endpoint, None, oldJson, None)))
119+
richDataService.saveData(userId, Seq(EndpointData(endpoint, None, None, None, oldJson, None)))
120120
.andThen(dataEventDispatcher.dispatchEventDataCreated(s"migrated data to $endpoint"))
121121
.map(_ => 1L)
122122
.recover {

hat/app/org/hatdex/hat/api/service/UsersService.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,14 @@ class UsersService @Inject() (cache: AsyncCacheApi)(implicit ec: DalExecutionCon
174174
}
175175

176176
def previousLogin(user: HatUser)(implicit server: HatServer): Future[Option[HatAccessLog]] = {
177-
logger.error(s"Getting previous login for $user")
177+
logger.debug(s"Getting previous login for $user@${server.domain}")
178178
val query = for {
179179
access <- UserAccessLog.filter(l => l.userId === user.userId).sortBy(_.date.desc).take(2).drop(1)
180180
user <- access.userUserFk
181181
} yield (access, user)
182182
server.db.run(query.result)
183183
.andThen {
184-
case Success(h) logger.error(s"Got previous logins $h")
184+
case Success(h) logger.error(s"Got previous logins $h @${server.domain}")
185185
}
186186
.map(_.headOption)
187187
.map(_.map(au => ModelTranslation.fromDbModel(au._1, ModelTranslation.fromDbModel(au._2))))

hat/app/org/hatdex/hat/api/service/applications/ApplicationsService.scala

+51-26
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
package org.hatdex.hat.api.service.applications
2525

2626
import akka.Done
27-
import javax.inject.Inject
27+
import akka.actor.ActorSystem
2828
import com.mohiva.play.silhouette.api.Silhouette
29+
import javax.inject.Inject
2930
import org.hatdex.hat.api.models.applications.{ Application, ApplicationStatus, HatApplication, Version }
3031
import org.hatdex.hat.api.models.{ AccessToken, DataDebit, EndpointQuery }
3132
import org.hatdex.hat.api.service.richData.{ DataDebitService, RichDataDuplicateDebitException, RichDataService }
@@ -35,7 +36,7 @@ import org.hatdex.hat.authentication.models.HatUser
3536
import org.hatdex.hat.dal.Tables
3637
import org.hatdex.hat.dal.Tables.ApplicationStatusRow
3738
import org.hatdex.hat.resourceManagement.HatServer
38-
import org.hatdex.hat.utils.FutureTransformations
39+
import org.hatdex.hat.utils.{ FutureTransformations, Utils }
3940
import org.hatdex.libs.dal.HATPostgresProfile.api._
4041
import org.joda.time.DateTime
4142
import play.api.Logger
@@ -46,6 +47,7 @@ import play.api.mvc.RequestHeader
4647

4748
import scala.concurrent.Future
4849
import scala.concurrent.duration._
50+
import scala.util.Success
4951

5052
class ApplicationStatusCheckService @Inject() (wsClient: WSClient)(implicit val rec: RemoteExecutionContext) {
5153

@@ -56,11 +58,13 @@ class ApplicationStatusCheckService @Inject() (wsClient: WSClient)(implicit val
5658
}
5759
}
5860

59-
protected def status(statusCheck: ApplicationStatus.External, token: String): Future[Boolean] =
61+
protected def status(statusCheck: ApplicationStatus.External, token: String): Future[Boolean] = {
6062
wsClient.url(statusCheck.statusUrl)
6163
.withHttpHeaders("x-auth-token" token)
64+
.withRequestTimeout(5000.millis)
6265
.get()
6366
.map(_.status == statusCheck.expectedStatus)
67+
}
6468
}
6569

6670
class ApplicationsService @Inject() (
@@ -69,15 +73,20 @@ class ApplicationsService @Inject() (
6973
dataDebitService: DataDebitService,
7074
statusCheckService: ApplicationStatusCheckService,
7175
trustedApplicationProvider: TrustedApplicationProvider,
72-
silhouette: Silhouette[HatApiAuthEnvironment])(implicit val ec: DalExecutionContext) {
76+
silhouette: Silhouette[HatApiAuthEnvironment],
77+
system: ActorSystem)(implicit val ec: DalExecutionContext) {
7378

7479
private val logger = Logger(this.getClass)
7580
private val applicationsCacheDuration: FiniteDuration = 30.minutes
7681

7782
def applicationStatus(id: String, bustCache: Boolean = false)(implicit hat: HatServer, user: HatUser, requestHeader: RequestHeader): Future[Option[HatApplication]] = {
83+
val eventuallyCleanedCache = if (bustCache) {
84+
cache.remove(s"apps:${hat.domain}")
85+
cache.remove(appCacheKey(id))
86+
}
87+
else { Future.successful(Done) }
7888
for {
79-
_ if (bustCache) { cache.remove(appCacheKey(id)) } else { Future.successful(Done) }
80-
_ if (bustCache) { cache.remove(s"apps:${hat.domain}") } else { Future.successful(Done) }
89+
_ eventuallyCleanedCache
8190
application cache.get[HatApplication](appCacheKey(id))
8291
.flatMap {
8392
case Some(application) Future.successful(Some(application))
@@ -86,24 +95,33 @@ class ApplicationsService @Inject() (
8695
for {
8796
maybeApp <- trustedApplicationProvider.application(id)
8897
setup <- applicationSetupStatus(id)(hat.db)
89-
status <- FutureTransformations.transform(maybeApp.map(collectStatus(_, setup)))
90-
_ status.map(cache.set(appCacheKey(id), _, applicationsCacheDuration)).getOrElse(Future.successful(Done))
91-
} yield status
98+
status <- FutureTransformations.transform(maybeApp.map(refetchApplicationsStatus(_, Seq(setup).flatten)))
99+
_ status.map(s cache.set(appCacheKey(id), s._1, applicationsCacheDuration)).getOrElse(Future.successful(Done))
100+
} yield status.map(_._1)
92101
}
93102
} yield application
94103
}
95104

96105
def applicationStatus()(implicit hat: HatServer, user: HatUser, requestHeader: RequestHeader): Future[Seq[HatApplication]] = {
97-
cache.getOrElseUpdate(s"apps:${hat.domain}", applicationsCacheDuration) {
98-
for {
99-
apps <- trustedApplicationProvider.applications // potentially caching
100-
setup <- applicationSetupStatus()(hat.db) // database
101-
statuses <- Future.sequence(apps
102-
.map(a => (a, setup.find(_.id == a.id)))
103-
.map(as => collectStatus(as._1, as._2)))
104-
_ Future.sequence(statuses.map(app cache.set(appCacheKey(app.application.id), app, applicationsCacheDuration))) // reinject all fetched items as individual cached items
105-
} yield statuses
106-
}
106+
cache.get[Seq[HatApplication]](s"apps:${hat.domain}")
107+
.flatMap({
108+
case Some(applications) Future.successful(applications)
109+
case None
110+
for {
111+
apps trustedApplicationProvider.applications // potentially caching
112+
setup applicationSetupStatus()(hat.db) // database
113+
statuses Future.sequence(apps.map(refetchApplicationsStatus(_, setup)))
114+
_ if (statuses.forall(_._2)) { cache.set(s"apps:${hat.domain}", statuses.unzip._1, applicationsCacheDuration) } else { Future.successful(Done) }
115+
} yield statuses.unzip._1
116+
})
117+
}
118+
119+
private def refetchApplicationsStatus(app: Application, setup: Seq[Tables.ApplicationStatusRow])(implicit hat: HatServer, user: HatUser, requestHeader: RequestHeader) = {
120+
val aSetup = setup.find(_.id == app.id)
121+
Utils.timeFuture(s"${hat.domain} ${app.id} status", logger)(collectStatus(app, aSetup))
122+
.andThen {
123+
case Success((a, true)) cache.set(appCacheKey(a.application.id), a, applicationsCacheDuration)
124+
}
107125
}
108126

109127
def setup(application: HatApplication)(implicit hat: HatServer, user: HatUser, requestHeader: RequestHeader): Future[HatApplication] = {
@@ -197,26 +215,33 @@ class ApplicationsService @Inject() (
197215
}
198216
}
199217

218+
private def fastOrDefault[T](timeout: FiniteDuration, default: T)(block: => Future[T]): Future[(T, Boolean)] = {
219+
val fallback = akka.pattern.after(timeout, using = system.scheduler)(Future.successful((default, false)))
220+
Future.firstCompletedOf(Seq(block.map((_, true)), fallback))
221+
}
222+
200223
private def collectStatus(app: Application, setup: Option[ApplicationStatusRow])(
201224
implicit
202-
hat: HatServer, user: HatUser, requestHeader: RequestHeader): Future[HatApplication] = {
225+
hat: HatServer, user: HatUser, requestHeader: RequestHeader): Future[(HatApplication, Boolean)] = { // return status as well as flag indicating if it is successfully generated
203226

204227
setup match {
205228
case Some(ApplicationStatusRow(_, version, true)) =>
229+
val eventualStatus = fastOrDefault(5.seconds, (false, ""))(checkStatus(app))
230+
val eventualMostRecentData = fastOrDefault(5.seconds, Option[DateTime](null))(mostRecentDataTime(app))
206231
for {
207-
(status, _) <- checkStatus(app)
208-
mostRecentData <- mostRecentDataTime(app)
232+
((status, _), canCacheStatus) <- eventualStatus
233+
(mostRecentData, canCacheData) <- eventualMostRecentData
209234
} yield {
210235
logger.debug(s"Check compatibility between $version and new ${app.status}: ${Version(version).greaterThan(app.status.compatibility)}")
211-
HatApplication(app, setup = true, enabled = true, active = status,
236+
(HatApplication(app, setup = true, enabled = true, active = status,
212237
Some(app.status.compatibility.greaterThan(Version(version))), // Needs updating if setup version beyond compatible
213-
mostRecentData)
238+
mostRecentData), canCacheStatus && canCacheData)
214239
}
215240
case Some(ApplicationStatusRow(_, _, false)) =>
216241
// If application has been disabled, reflect in status
217-
Future.successful(HatApplication(app, setup = true, enabled = false, active = false, needsUpdating = None, mostRecentData = None))
242+
Future.successful((HatApplication(app, setup = true, enabled = false, active = false, needsUpdating = None, mostRecentData = None), true))
218243
case None =>
219-
Future.successful(HatApplication(app, setup = false, enabled = false, active = false, needsUpdating = None, mostRecentData = None))
244+
Future.successful((HatApplication(app, setup = false, enabled = false, active = false, needsUpdating = None, mostRecentData = None), true))
220245
}
221246
}
222247

hat/app/org/hatdex/hat/api/service/monitoring/HatDataEventBus.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class HatDataEventDispatcher @Inject() (dataEventBus: HatDataEventBus) {
8686
protected val logger: Logger = Logger(this.getClass)
8787

8888
def dispatchEventDataCreated(message: String)(implicit request: SecuredRequest[HatApiAuthEnvironment, _]): PartialFunction[Try[Seq[EndpointData]], Unit] = {
89-
case Success(saved) =>
89+
case Success(saved) if saved.nonEmpty =>
9090
logger.debug(s"Dispatch data created event: $message")
9191
dataEventBus.publish(HatDataEventBus.DataCreatedEvent(
9292
request.dynamicEnvironment.domain,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright (C) 2017 HAT Data Exchange Ltd
3+
* SPDX-License-Identifier: AGPL-3.0
4+
*
5+
* This file is part of the Hub of All Things project (HAT).
6+
*
7+
* HAT is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU Affero General Public License
9+
* as published by the Free Software Foundation, version 3 of
10+
* the License.
11+
*
12+
* HAT is distributed in the hope that it will be useful, but
13+
* WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
15+
* the GNU Affero General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU Affero General
18+
* Public License along with this program. If not, see
19+
* <http://www.gnu.org/licenses/>.
20+
*
21+
* Written by Andrius Aucinas <andrius.aucinas@hatdex.org>
22+
* 7 / 2018
23+
*/
24+
25+
package org.hatdex.hat.api.service.richData
26+
27+
import akka.Done
28+
import akka.actor.ActorSystem
29+
import akka.stream.ActorMaterializer
30+
import akka.stream.scaladsl.{ Flow, Sink, Source }
31+
import javax.inject.Inject
32+
import org.hatdex.hat.api.service.DalExecutionContext
33+
import org.hatdex.hat.dal.Tables._
34+
import org.hatdex.libs.dal.HATPostgresProfile.api._
35+
import org.joda.time.DateTime
36+
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter, ISODateTimeFormat }
37+
import play.api.libs.json._
38+
39+
import scala.concurrent.Future
40+
41+
class DataSentintel @Inject() (implicit ec: DalExecutionContext, actorSystem: ActorSystem) {
42+
43+
protected implicit val materializer: ActorMaterializer = ActorMaterializer()
44+
protected val updateBatchSize = 500
45+
46+
// def validateDataStructure(data: JsValue, configuration: EndpointConfiguration): (JsValue, Option[String], Option[DateTime]) = {
47+
// throw new RuntimeException("Not Implemented")
48+
// }
49+
50+
def ensureUniquenessKey(source: String, key: String)(implicit db: Database): Future[Done] = {
51+
import com.github.tminglei.slickpg.window.PgWindowFuncSupport.WindowFunctions._
52+
val config = EndpointConfiguration(Some(key), None, None)
53+
54+
val dbJsonPath = key.split('.').toList
55+
56+
val clashingRecords = DataJson
57+
.filter(_.source === source) // only records for this source
58+
.filterNot(_.data.#>>(dbJsonPath) === "") // and only those that do have the key defined
59+
.map(r (r.recordId, rowNumber().over.partitionBy(r.source, r.data #>> dbJsonPath).sortBy(r.date.desc))) // number the rows starting from most recent
60+
.subquery // subquery used to force generating the query before (incorrectly) Slick tries to use the partition windowing function within where clause
61+
.filter { case (_, rank) rank > 1L } // skip the newest row
62+
.map(_._1) // get record ID for each remaining row
63+
64+
val deleteQuery = DataJson.filter(_.recordId in clashingRecords).delete
65+
66+
val updatingStream = Source.fromPublisher(db.stream(DataJson.filter(_.source === source).result.transactionally.withStatementParameters(fetchSize = updateBatchSize)))
67+
.via(Flow[DataJsonRow].grouped(updateBatchSize))
68+
.mapAsync(1)({ batch
69+
db.run(DBIO.sequence(
70+
batch
71+
.map(r r.copy(sourceUniqueId = config.getKey(r.data))) // take the value at key as either string or number
72+
.map(DataJson.insertOrUpdate)).transactionally) // update the row with sourceUniqueId inserted
73+
})
74+
75+
db.run(deleteQuery)
76+
.flatMap(_ {
77+
updatingStream.runWith(Sink.ignore) // the result is not important as long as it succeeds
78+
})
79+
.map(_ Done)
80+
}
81+
82+
def updateSourceTimestamp(source: String, key: String, format: String = "")(implicit db: Database): Future[Done] = {
83+
val config = EndpointConfiguration(None, Some(key), Some(format))
84+
85+
val updatingStream = Source.fromPublisher(db.stream(DataJson.filter(_.source === source).result.transactionally.withStatementParameters(fetchSize = updateBatchSize)))
86+
.via(Flow[DataJsonRow].grouped(updateBatchSize))
87+
.mapAsync(parallelism = 1)({ batch
88+
db.run(DBIO.sequence(
89+
batch
90+
.map(r r.copy(sourceTimestamp = config.getTimestamp(r.data))) // take the value at key as DateTime
91+
.map(DataJson.insertOrUpdate)).transactionally) // update the row with sourceUniqueId inserted
92+
})
93+
94+
updatingStream
95+
.runWith(Sink.ignore) // the result is not important as long as it succeeds
96+
.map(_ Done)
97+
}
98+
}
99+
100+
case class EndpointConfiguration(
101+
keyField: Option[String],
102+
timestampField: Option[String],
103+
timestampFormat: Option[String] // JODA DateTime format or "'epoch'" literal to denote that it is in seconds since epoch, defaults to ISO8601
104+
) {
105+
lazy val keyPath: Option[JsPath] = keyField.map(JsonDataTransformer.parseJsPath)
106+
lazy val timestampPath: Option[JsPath] = timestampField.map(JsonDataTransformer.parseJsPath)
107+
private implicit val timestampJsonFormat = jodaDateReads(timestampFormat.getOrElse(""))
108+
109+
def getKey(d: JsValue): Option[String] = keyPath.flatMap(k k.asSingleJson(d).asOpt[String]
110+
.orElse(k.asSingleJson(d).asOpt[Long].map(_.toString)))
111+
112+
def getTimestamp(d: JsValue): Option[DateTime] = timestampPath.flatMap(_.asSingleJson(d).asOpt[DateTime])
113+
114+
protected def jodaDateReads(pattern: String): Reads[DateTime] = new Reads[DateTime] {
115+
116+
private val (df, corrector, numberCorrector): (DateTimeFormatter, String String, Long Long) =
117+
pattern match {
118+
case "" (ISODateTimeFormat.dateOptionalTimeParser, identity[String], identity[Long])
119+
case "'epoch'" (ISODateTimeFormat.dateOptionalTimeParser, identity[String], { x: Long x * 1000L })
120+
case _ (DateTimeFormat.forPattern(pattern), identity[String], identity[Long])
121+
}
122+
123+
def reads(json: JsValue): JsResult[DateTime] = json match {
124+
case JsNumber(d) => JsSuccess(new DateTime(numberCorrector(d.toLong)))
125+
case JsString(s) => parseDate(corrector(s)) match {
126+
case Some(d) => JsSuccess(d)
127+
case _ => JsError(Seq(JsPath() -> Seq(JsonValidationError("error.expected.jodadate.format", pattern))))
128+
}
129+
case _ => JsError(Seq(JsPath() -> Seq(JsonValidationError("error.expected.date"))))
130+
}
131+
132+
private def parseDate(input: String): Option[DateTime] =
133+
scala.util.control.Exception.nonFatalCatch[DateTime] opt (DateTime.parse(input, df))
134+
}
135+
}
136+

0 commit comments

Comments
 (0)