Skip to content

Commit

Permalink
Fixed indentation
Browse files Browse the repository at this point in the history
  • Loading branch information
badrinathpatchikolla committed Aug 31, 2023
1 parent 47c2348 commit 75ef8ef
Showing 1 changed file with 68 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,99 +13,72 @@ import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

final case class Response(
`__ID__`: String,
`__BODY__`: Option[String] = None,
`__HEADER__`: Map[String, Seq[String]] = Map(),
`__STATUS_CODE__`: Option[Int] = None,
`__STATUS_MSG__`: Option[String] = None,
`__ERROR__`: Option[String] = None,
`__ELAPSED_TIME__`: Long,
`__URL__`: String,
`__DATA__`: String)
`__ID__`: String,
`__BODY__`: Option[String] = None,
`__HEADER__`: Map[String, Seq[String]] = Map(),
`__STATUS_CODE__`: Option[Int] = None,
`__STATUS_MSG__`: Option[String] = None,
`__ERROR__`: Option[String] = None,
`__ELAPSED_TIME__`: Long,
`__URL__`: String,
`__DATA__`: String)

final case class ResponseBatch(
`__ID__`: Seq[String],
`__BODY__`: Option[String] = None,
`__HEADER__`: Map[String, Seq[String]] = Map(),
`__STATUS_CODE__`: Option[Int] = None,
`__STATUS_MSG__`: Option[String] = None,
`__ERROR__`: Option[String] = None,
`__ELAPSED_TIME__`: Long,
`__URL__`: String,
`__DATA__`: String)
`__ID__`: Seq[String],
`__BODY__`: Option[String] = None,
`__HEADER__`: Map[String, Seq[String]] = Map(),
`__STATUS_CODE__`: Option[Int] = None,
`__STATUS_MSG__`: Option[String] = None,
`__ERROR__`: Option[String] = None,
`__ELAPSED_TIME__`: Long,
`__URL__`: String,
`__DATA__`: String)


object Alias {
val DataCol = "__DATA__"
val IdCol = "__ID__"
val UrlCol = "__URL__"
val ParamsCol = "__PARAMS__"
val HiddenParamsCol = "__HIDDEN_PARAMS__"
val HeadersCol = "__HEADERS__"
}


private[almaren] case class HTTP(
headers: Map[String, String],
params: Map[String, String],
hiddenParams: Map[String, String],
method: String,
requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response,
session: () => requests.Session,
connectTimeout: Int,
readTimeout: Int,
threadPoolSize: Int,
batchSize: Int) extends Main {

private def columnExists(df: DataFrame, colName: String): Option[String] =
Some(colName).filter(df.columns.contains)

private def getRowParam(row: Row, colNameExists: Option[String]): Map[String, String] =
colNameExists.map(name => row.getAs[Map[String, String]](name)).getOrElse(Map())
headers: Map[String, String],
params: Map[String, String],
hiddenParams: Map[String, String],
method: String,
requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response,
session: () => requests.Session,
connectTimeout: Int,
readTimeout: Int,
threadPoolSize: Int,
batchSize: Int) extends Main {

override def core(df: DataFrame): DataFrame = {
logger.info(s"headers:{$headers},params:{$params}, method:{$method}, connectTimeout:{$connectTimeout}, readTimeout{$readTimeout}, threadPoolSize:{$threadPoolSize}, batchSize:{$batchSize}")

import df.sparkSession.implicits._

val headersColExists = columnExists(df,Alias.HeadersCol)
val paramsColExists = columnExists(df,Alias.ParamsCol)
val hiddenParamsColExists = columnExists(df,Alias.HiddenParamsCol)

val result = df.mapPartitions(partition => {

implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(threadPoolSize))
val data: Iterator[Future[Seq[Response]]] = partition.grouped(batchSize).map(rows => Future {
val s = session()
rows.map(row => request(row, s, headersColExists, paramsColExists, hiddenParamsColExists))
rows.map(row => request(row, s))
})
val requests: Future[Iterator[Seq[Response]]] = Future.sequence(data)
Await.result(requests, Duration.Inf).flatten
})
result.toDF
}

private def request(row: Row, session: Session, headersColExists: Option[String], paramsColExists: Option[String], hiddenParamsColExists: Option[String]): Response = {
private def request(row: Row, session: Session): Response = {
val url = row.getAs[Any](Alias.UrlCol).toString
val startTime = System.currentTimeMillis()

val headersRow = getRowParam(row, headersColExists)
val paramsRow = getRowParam(row, paramsColExists)
val hiddenParamsRow = getRowParam(row, hiddenParamsColExists)

val allHeaders = headers ++ headersRow
val allParams = params ++ paramsRow
val allHiddenParams = hiddenParams ++ hiddenParamsRow

logger.info(s"headers:{$allHeaders},params:{$allParams}")
val response = Try(requestHandler(row, session, url, allHeaders, allParams ++ allHiddenParams, method, connectTimeout, readTimeout))
val response = Try(requestHandler(row, session, url, headers, params ++ hiddenParams, method, connectTimeout, readTimeout))
val elapsedTime = System.currentTimeMillis() - startTime
val id = row.getAs[Any](Alias.IdCol).toString

val data = method.toUpperCase match {
case "PUT" | "POST" | "DELETE" => row.getAs[Any](Alias.DataCol).toString
case _ => ""
}
val data = row.getAs[Any](Alias.DataCol).toString

def getResponse(r: requests.Response) = Response(
id,
Expand All @@ -126,25 +99,25 @@ private[almaren] case class HTTP(
case Failure(re: RequestFailedException) => getResponse(re.response)
case Failure(f) => {
logger.error("Almaren HTTP Request Error", f)
Response(id, `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url,`__DATA__` = data)
Response(id, `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url, `__DATA__` = data)
}
}
}
}

private[almaren] case class HTTPBatch(
url: String,
headers: Map[String, String],
params: Map[String, String],
hiddenParams: Map[String, String],
method: String,
requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response,
session: () => requests.Session,
connectTimeout: Int,
readTimeout: Int,
batchSize: Int,
batchDelimiter: (Seq[Row]) => String
) extends Main {
url: String,
headers: Map[String, String],
params: Map[String, String],
hiddenParams: Map[String, String],
method: String,
requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response,
session: () => requests.Session,
connectTimeout: Int,
readTimeout: Int,
batchSize: Int,
batchDelimiter: (Seq[Row]) => String
) extends Main {

override def core(df: DataFrame): DataFrame = {
logger.info(s"url:{$url}, headers:{$headers},params:{$params}, method:{$method}, connectTimeout:{$connectTimeout}, readTimeout{$readTimeout}, batchSize:{$batchSize}")
Expand Down Expand Up @@ -202,16 +175,16 @@ private[almaren] case class HTTPBatch(
private[almaren] trait HTTPConnector extends Core {

def http(
headers: Map[String, String] = Map(),
params: Map[String, String] = Map(),
hiddenParams: Map[String, String] = Map(),
method: String,
requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandler,
session: () => requests.Session = HTTPConn.defaultSession,
connectTimeout: Int = 60000,
readTimeout: Int = 1000,
threadPoolSize: Int = 1,
batchSize: Int = 5000): Option[Tree] =
headers: Map[String, String] = Map(),
params: Map[String, String] = Map(),
hiddenParams: Map[String, String] = Map(),
method: String,
requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandler,
session: () => requests.Session = HTTPConn.defaultSession,
connectTimeout: Int = 60000,
readTimeout: Int = 1000,
threadPoolSize: Int = 1,
batchSize: Int = 5000): Option[Tree] =
HTTP(
headers,
params,
Expand All @@ -226,18 +199,18 @@ private[almaren] trait HTTPConnector extends Core {
)

def httpBatch(
url: String,
headers: Map[String, String] = Map(),
params: Map[String, String] = Map(),
hiddenParams: Map[String, String] = Map(),
method: String,
requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandlerBatch,
session: () => requests.Session = HTTPConn.defaultSession,
connectTimeout: Int = 60000,
readTimeout: Int = 1000,
batchSize: Int = 5000,
batchDelimiter: (Seq[Row]) => String = HTTPConn.defaultBatchDelimiter
): Option[Tree] =
url: String,
headers: Map[String, String] = Map(),
params: Map[String, String] = Map(),
hiddenParams: Map[String, String] = Map(),
method: String,
requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandlerBatch,
session: () => requests.Session = HTTPConn.defaultSession,
connectTimeout: Int = 60000,
readTimeout: Int = 1000,
batchSize: Int = 5000,
batchDelimiter: (Seq[Row]) => String = HTTPConn.defaultBatchDelimiter
): Option[Tree] =
HTTPBatch(
url,
headers,
Expand Down Expand Up @@ -283,4 +256,4 @@ object HTTPConn {
val defaultSession = () => requests.Session()

implicit class HTTPImplicit(val container: Option[Tree]) extends HTTPConnector
}
}

0 comments on commit 75ef8ef

Please sign in to comment.