Skip to content

Commit

Permalink
Base feature (#1)
Browse files Browse the repository at this point in the history
* Base feature
* Update sbt version
* #1 github workflow to build, sbt-header check + github workflow
* #1 headers update (newline) to be compatible with sbt header plugin
* #1 Constants reverted to Enceladus values (test passing and Enceladus-compatiblity), created issue #5 to follow up
* #1 testfix (missing test config, typeName wrapper - typeOf[Boolean].toString returning `Boolean` vs `scala.Boolean` + test
* #1 cleanup, SectionSuite added (originating in Enceladus). `FieldValidationFailure` renamed to `FieldValidationIssue`

* #1 sbt `in` -> `/`
* #1 removed unused test resources
* #1 added header checking for src/test/scala, too. + header NL fixes, build name = small caps,
* #1 assembly removed (plugin + config)
* #1 StructFieldImplicitsSuite added, `StandardizationRerunSuite` renamed to `StandardizationCsvSuite`
* #1 Enceladus#677 changed to #7
* `StdInterpreterSuite` and `StandardizationInterpreterSuite` merged into the latter.
* #1 `StandardizationInterpreterSuite` test fix

Co-authored-by: Daniel Kavan <dk1844@gmail.com>
  • Loading branch information
Zejnilovic and dk1844 authored Dec 13, 2021
1 parent f067300 commit ed7c2aa
Show file tree
Hide file tree
Showing 133 changed files with 16,600 additions and 0 deletions.
36 changes: 36 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# top-most EditorConfig file
root = true

[*]
charset = utf-8
end_of_line = lf
trim_trailing_whitespace = true

[*.xml]
indent_size = 4
indent_style = space
insert_final_newline = true

[*.{java,scala,js,json,css}]
indent_size = 2
indent_style = space
insert_final_newline = true
max_line_length = 120

[*.md]
trim_trailing_whitespace = false
41 changes: 41 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: Build

on:
push:
branches: [ main, develop, master ]
pull_request:
branches: [ master, develop ]
types: [ assigned, opened, synchronize, reopened, labeled ]

jobs:
test-sbt:
runs-on: ubuntu-latest
strategy:
fail-fast: false
name: SBT Test
steps:
- name: Checkout code
uses: actions/checkout@v2
- uses: coursier/cache-action@v5
- name: Setup Scala
uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.8"
- name: Build and run tests
run: sbt test
41 changes: 41 additions & 0 deletions .github/workflows/licence_check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name: License Check

on:
push:
branches: [ main, develop, master ]
pull_request:
branches: [ master ]
types: [ assigned, opened, synchronize, reopened, labeled ]

jobs:
license-test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Setup Scala
uses: olafurpg/setup-scala@v10
with:
java-version: "adopt@1.8"
# note, that task "headerCheck" defaults to just "compile:headerCheck" - see https://github.com/sbt/sbt-header/issues/14
- name: SBT src licence header check
run: sbt Compile/headerCheck
- name: SBT test licence header check
run: sbt Test/headerCheck

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
Expand Down
41 changes: 41 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.00
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


ThisBuild / name := "standardization"
ThisBuild / organization := "za.co.absa"
ThisBuild / version := "0.0.1-SNAPSHOT"
ThisBuild / scalaVersion := "2.11.12"

libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % "2.4.7" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.7" % "provided",
"za.co.absa" %% "spark-hats" % "0.2.2",
"za.co.absa" %% "spark-hofs" % "0.4.0",
"org.scalatest" %% "scalatest" % "3.2.2" % Test,
"com.typesafe" % "config" % "1.4.1"
)

Test / parallelExecution := false

// licenceHeader check:

ThisBuild / organizationName := "ABSA Group Limited"
ThisBuild / startYear := Some(2021)
ThisBuild / licenses += "Apache-2.0" -> url("https://www.apache.org/licenses/LICENSE-2.0.txt")

// linting
Global / excludeLintKeys += ThisBuild / name // will be used in publish, todo #3 - confirm if lint ignore is still needed
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.5.5
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")
31 changes: 31 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2021 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configuration added here is considered the application default and it will be used
# for keys that are not specified in the provided 'application.conf' or system properties.
# Here is the precedence of configuration (top ones have higher precedence):
# 1. System Properties (e.g. passed as '-Dkey=value')
# 2. application.conf (e.g. provided as '-Dconfig.file=...')
# 3. reference.conf

# 'enceladus_record_id' with an id can be added containing either true UUID, always the same IDs (row-hash-based) or the
# column will not be added at all. Allowed values: "uuid", "stableHashId", "none"
standardization.recordId.generation.strategy="uuid"

# system-wide time zone
timezone="UTC"

standardization.testUtils.sparkTestBaseMaster="local[4]"

standardization.failOnInputNotPerSchema=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.standardization

import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.functions.{callUDF, col, struct}
import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
import org.slf4j.LoggerFactory
import za.co.absa.standardization.schema.SchemaUtils

object ArrayTransformations {
private val logger = LoggerFactory.getLogger(this.getClass)
def flattenArrays(df: Dataset[Row], colName: String)(implicit spark: SparkSession): Dataset[Row] = {
val typ = SchemaUtils.getFieldType(colName, df.schema).getOrElse(throw new Error(s"Field $colName does not exist in ${df.schema.printTreeString()}"))
if (!typ.isInstanceOf[ArrayType]) {
logger.info(s"Field $colName is not an ArrayType, returning the original dataset!")
df
} else {
val arrType = typ.asInstanceOf[ArrayType]
if (!arrType.elementType.isInstanceOf[ArrayType]) {
logger.info(s"Field $colName is not a nested array, returning the original dataset!")
df
} else {
val udfName = colName.replace('.', '_') + System.currentTimeMillis()

spark.udf.register(udfName, new UDF1[Seq[Seq[Row]], Seq[Row]] {
def call(t1: Seq[Seq[Row]]): Seq[Row] = if (t1 == null) null.asInstanceOf[Seq[Row]] else t1.filter(_ != null).flatten // scalastyle:ignore null
}, arrType.elementType)

nestedWithColumn(df)(colName, callUDF(udfName, col(colName)))
}
}

}

def nestedWithColumn(ds: Dataset[Row])(columnName: String, column: Column): Dataset[Row] = {
val toks = columnName.split("\\.").toList

def helper(tokens: List[String], pathAcc: Seq[String]): Column = {
val currPath = (pathAcc :+ tokens.head).mkString(".")
val topType = SchemaUtils.getFieldType(currPath, ds.schema)

// got a match
if (currPath == columnName) {
column as tokens.head
} // some other attribute
else if (!columnName.startsWith(currPath)) {
arrCol(currPath)
} // partial match, keep going
else if (topType.isEmpty) {
struct(helper(tokens.tail, pathAcc ++ List(tokens.head))) as tokens.head
} else {
topType.get match {
case s: StructType =>
val cols = s.fields.map(_.name)
val fields = if (tokens.size > 1 && !cols.contains(tokens(1))) {
cols :+ tokens(1)
} else {
cols
}
struct(fields.map(field => helper((List(field) ++ tokens.tail).distinct, pathAcc :+ tokens.head) as field): _*) as tokens.head
case _: ArrayType => throw new IllegalStateException("Cannot reconstruct array columns. Please use this within arrayTransform.")
case _: DataType => arrCol(currPath) as tokens.head
}
}
}

ds.withColumn(toks.head, helper(toks, Seq()))
}

def arrCol(any: String): Column = {
val toks = any.replaceAll("\\[(\\d+)\\]", "\\.$1").split("\\.")
toks.tail.foldLeft(col(toks.head)){
case (acc, tok) =>
if (tok.matches("\\d+")) {
acc(tok.toInt)
} else {
acc(tok)
}
}
}
}
Loading

0 comments on commit ed7c2aa

Please sign in to comment.