diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala index 76d0e38..151b16d 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala @@ -13,71 +13,58 @@ import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} final case class Response( - `__ID__`: String, - `__BODY__`: Option[String] = None, - `__HEADER__`: Map[String, Seq[String]] = Map(), - `__STATUS_CODE__`: Option[Int] = None, - `__STATUS_MSG__`: Option[String] = None, - `__ERROR__`: Option[String] = None, - `__ELAPSED_TIME__`: Long, - `__URL__`: String, - `__DATA__`: String) + `__ID__`: String, + `__BODY__`: Option[String] = None, + `__HEADER__`: Map[String, Seq[String]] = Map(), + `__STATUS_CODE__`: Option[Int] = None, + `__STATUS_MSG__`: Option[String] = None, + `__ERROR__`: Option[String] = None, + `__ELAPSED_TIME__`: Long, + `__URL__`: String, + `__DATA__`: String) final case class ResponseBatch( - `__ID__`: Seq[String], - `__BODY__`: Option[String] = None, - `__HEADER__`: Map[String, Seq[String]] = Map(), - `__STATUS_CODE__`: Option[Int] = None, - `__STATUS_MSG__`: Option[String] = None, - `__ERROR__`: Option[String] = None, - `__ELAPSED_TIME__`: Long, - `__URL__`: String, - `__DATA__`: String) + `__ID__`: Seq[String], + `__BODY__`: Option[String] = None, + `__HEADER__`: Map[String, Seq[String]] = Map(), + `__STATUS_CODE__`: Option[Int] = None, + `__STATUS_MSG__`: Option[String] = None, + `__ERROR__`: Option[String] = None, + `__ELAPSED_TIME__`: Long, + `__URL__`: String, + `__DATA__`: String) object Alias { val DataCol = "__DATA__" val IdCol = "__ID__" val UrlCol = "__URL__" - val ParamsCol = "__PARAMS__" - val HiddenParamsCol = "__HIDDEN_PARAMS__" - val HeadersCol = "__HEADERS__" } private[almaren] case class HTTP( - headers: Map[String, String], - params: Map[String, String], - hiddenParams: Map[String, String], - method: String, - requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response, - session: () => requests.Session, - connectTimeout: Int, - readTimeout: Int, - threadPoolSize: Int, - batchSize: Int) extends Main { - - private def columnExists(df: DataFrame, colName: String): Option[String] = - Some(colName).filter(df.columns.contains) - - private def getRowParam(row: Row, colNameExists: Option[String]): Map[String, String] = - colNameExists.map(name => row.getAs[Map[String, String]](name)).getOrElse(Map()) + headers: Map[String, String], + params: Map[String, String], + hiddenParams: Map[String, String], + method: String, + requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response, + session: () => requests.Session, + connectTimeout: Int, + readTimeout: Int, + threadPoolSize: Int, + batchSize: Int) extends Main { override def core(df: DataFrame): DataFrame = { logger.info(s"headers:{$headers},params:{$params}, method:{$method}, connectTimeout:{$connectTimeout}, readTimeout{$readTimeout}, threadPoolSize:{$threadPoolSize}, batchSize:{$batchSize}") import df.sparkSession.implicits._ - val headersColExists = columnExists(df,Alias.HeadersCol) - val paramsColExists = columnExists(df,Alias.ParamsCol) - val hiddenParamsColExists = columnExists(df,Alias.HiddenParamsCol) - val result = df.mapPartitions(partition => { implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(threadPoolSize)) val data: Iterator[Future[Seq[Response]]] = partition.grouped(batchSize).map(rows => Future { val s = session() - rows.map(row => request(row, s, headersColExists, paramsColExists, hiddenParamsColExists)) + rows.map(row => request(row, s)) }) val requests: Future[Iterator[Seq[Response]]] = Future.sequence(data) Await.result(requests, Duration.Inf).flatten @@ -85,27 +72,13 @@ private[almaren] case class HTTP( result.toDF } - private def request(row: Row, session: Session, headersColExists: Option[String], paramsColExists: Option[String], hiddenParamsColExists: Option[String]): Response = { + private def request(row: Row, session: Session): Response = { val url = row.getAs[Any](Alias.UrlCol).toString val startTime = System.currentTimeMillis() - - val headersRow = getRowParam(row, headersColExists) - val paramsRow = getRowParam(row, paramsColExists) - val hiddenParamsRow = getRowParam(row, hiddenParamsColExists) - - val allHeaders = headers ++ headersRow - val allParams = params ++ paramsRow - val allHiddenParams = hiddenParams ++ hiddenParamsRow - - logger.info(s"headers:{$allHeaders},params:{$allParams}") - val response = Try(requestHandler(row, session, url, allHeaders, allParams ++ allHiddenParams, method, connectTimeout, readTimeout)) + val response = Try(requestHandler(row, session, url, headers, params ++ hiddenParams, method, connectTimeout, readTimeout)) val elapsedTime = System.currentTimeMillis() - startTime val id = row.getAs[Any](Alias.IdCol).toString - - val data = method.toUpperCase match { - case "PUT" | "POST" | "DELETE" => row.getAs[Any](Alias.DataCol).toString - case _ => "" - } + val data = row.getAs[Any](Alias.DataCol).toString def getResponse(r: requests.Response) = Response( id, @@ -126,25 +99,25 @@ private[almaren] case class HTTP( case Failure(re: RequestFailedException) => getResponse(re.response) case Failure(f) => { logger.error("Almaren HTTP Request Error", f) - Response(id, `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url,`__DATA__` = data) + Response(id, `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url, `__DATA__` = data) } } } } private[almaren] case class HTTPBatch( - url: String, - headers: Map[String, String], - params: Map[String, String], - hiddenParams: Map[String, String], - method: String, - requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response, - session: () => requests.Session, - connectTimeout: Int, - readTimeout: Int, - batchSize: Int, - batchDelimiter: (Seq[Row]) => String - ) extends Main { + url: String, + headers: Map[String, String], + params: Map[String, String], + hiddenParams: Map[String, String], + method: String, + requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response, + session: () => requests.Session, + connectTimeout: Int, + readTimeout: Int, + batchSize: Int, + batchDelimiter: (Seq[Row]) => String + ) extends Main { override def core(df: DataFrame): DataFrame = { logger.info(s"url:{$url}, headers:{$headers},params:{$params}, method:{$method}, connectTimeout:{$connectTimeout}, readTimeout{$readTimeout}, batchSize:{$batchSize}") @@ -202,16 +175,16 @@ private[almaren] case class HTTPBatch( private[almaren] trait HTTPConnector extends Core { def http( - headers: Map[String, String] = Map(), - params: Map[String, String] = Map(), - hiddenParams: Map[String, String] = Map(), - method: String, - requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandler, - session: () => requests.Session = HTTPConn.defaultSession, - connectTimeout: Int = 60000, - readTimeout: Int = 1000, - threadPoolSize: Int = 1, - batchSize: Int = 5000): Option[Tree] = + headers: Map[String, String] = Map(), + params: Map[String, String] = Map(), + hiddenParams: Map[String, String] = Map(), + method: String, + requestHandler: (Row, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandler, + session: () => requests.Session = HTTPConn.defaultSession, + connectTimeout: Int = 60000, + readTimeout: Int = 1000, + threadPoolSize: Int = 1, + batchSize: Int = 5000): Option[Tree] = HTTP( headers, params, @@ -226,18 +199,18 @@ private[almaren] trait HTTPConnector extends Core { ) def httpBatch( - url: String, - headers: Map[String, String] = Map(), - params: Map[String, String] = Map(), - hiddenParams: Map[String, String] = Map(), - method: String, - requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandlerBatch, - session: () => requests.Session = HTTPConn.defaultSession, - connectTimeout: Int = 60000, - readTimeout: Int = 1000, - batchSize: Int = 5000, - batchDelimiter: (Seq[Row]) => String = HTTPConn.defaultBatchDelimiter - ): Option[Tree] = + url: String, + headers: Map[String, String] = Map(), + params: Map[String, String] = Map(), + hiddenParams: Map[String, String] = Map(), + method: String, + requestHandler: (String, Session, String, Map[String, String], Map[String, String], String, Int, Int) => requests.Response = HTTPConn.defaultHandlerBatch, + session: () => requests.Session = HTTPConn.defaultSession, + connectTimeout: Int = 60000, + readTimeout: Int = 1000, + batchSize: Int = 5000, + batchDelimiter: (Seq[Row]) => String = HTTPConn.defaultBatchDelimiter + ): Option[Tree] = HTTPBatch( url, headers, @@ -283,4 +256,4 @@ object HTTPConn { val defaultSession = () => requests.Session() implicit class HTTPImplicit(val container: Option[Tree]) extends HTTPConnector -} \ No newline at end of file +}