diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..613d4a3
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,60 @@
+RemoteSystemsTempFiles/
+Servers/
+target/
+logs/
+.metadata/
+bin/
+tmp/
+*.tmp
+*.bak
+*.swp
+*~.nib
+local.properties
+.settings/
+.loadpath
+.recommenders
+.idea/
+.project
+classes/
+.classpath
+.iml
+*_SUCCESS*
+*.crc
+
+# External tool builders
+.externalToolBuilders/
+
+# Locally stored "Eclipse launch configurations"
+*.launch
+
+# PyDev specific (Python IDE for Eclipse)
+*.pydevproject
+
+# CDT-specific (C/C++ Development Tooling)
+.cproject
+
+# Java annotation processor (APT)
+.factorypath
+
+# PDT-specific (PHP Development Tools)
+.buildpath
+
+# sbteclipse plugin
+.target
+
+# Tern plugin
+.tern-project
+
+# TeXlipse plugin
+.texlipse
+
+# STS (Spring Tool Suite)
+.springBeans
+
+# Code Recommenders
+.recommenders/
+
+# Scala IDE specific (Scala & Java development for Eclipse)
+.cache-main
+.scala_dependencies
+.worksheet
diff --git a/README.md b/README.md
index a4d0f2a..56e550a 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,53 @@
# big-data-insights-scala
-personal solutions to big data problem scenarios using scala
+personal solutions to big data problem scenarios using scala
+
+## Project Structure
+Each package is based on a problem scenario.
+
+Each problem scenario will contain a main class in the *com.jwk.development.big_data_insights.scala.products.driver* package
+
+Each problem contains a problem scenario detail and result sheet.
+
+### 1. Product Data for a pen company
+
+ Problem: Given csv files with product information from a pen company, provide some insights using big data technologies
+
+ Package name: *com.jwk.development.big_data_insights.scala.products.problem_scenario_One*
+
+ Driver/Main class: *com.jwk.development.big_data_insights.scala.products.driver.run_problem_scenario_one*
+
+ Link to result sheet and detailed problem scenarions:
+
+ [Part One]()
+
+ [Part Two]()
+
+ [Part Three]()
+
+### 2. Patient Data
+
+ Problem: **
+
+ Package name: **
+
+ Driver/Main class: **
+
+ Link to result sheet and detailed problem scenarions:
+
+
+## Troubleshooting
+1. When running applications if below error occurs: *A master URL must be set in your configuration*
+ ```
+ Exception in thread "main" java.lang.ExceptionInInitializerError
+ at com.jwk.development.big_data_insights.scala.products.driver.problem_scenario_1.main(problem_scenario_1.scala)
+ Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
+ ```
+
+ Solution:
+
+ Add the following VM option to your run configurations
+ ```
+ -Dspark.master=local
+ ```
+ [link to setting spark master to local in intellij]()
+
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..a567064
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,34 @@
+name := "big-data-insights-scala"
+
+version := "1.0"
+
+scalaVersion := "2.11.8"
+
+libraryDependencies ++= Seq(
+ "org.apache.hadoop" % "hadoop-client" % "2.7.3",
+ ("org.apache.spark" % "spark-core_2.11" % "2.1.0"),
+ ("org.apache.spark" % "spark-sql_2.11" % "2.1.0"),
+ "org.apache.spark" % "spark-hive_2.11" % "2.1.0",
+ "com.databricks" % "spark-avro_2.11" % "3.2.0",
+ "com.databricks" % "spark-csv_2.10" % "1.3.0",
+ "org.scala-lang" % "scala-library" % "2.11.8",
+ "org.scala-lang" % "scala-reflect" % "2.11.8",
+ "com.typesafe" % "config" % "1.3.1",
+ "org.apache.logging.log4j" %% "log4j-api-scala" % "2.8.1",
+ "org.apache.logging.log4j" % "log4j-core" % "2.8.1",
+ "org.apache.kafka" %% "kafka" % "0.9.0.2.3.4.51-1"
+
+)
+//use external repositories
+resolvers += "HortonWorksRepo" at "http://repo.hortonworks.com/content/repositories/releases/"
+
+parallelExecution in test := false
+
+
+initialCommands := "import org.test._"
+
+//clean operations
+cleanFiles += baseDirectory { base => base / "build" }.value
+cleanFiles += baseDirectory { base => base / "metastore_db" }.value
+
+//assembly-settings
\ No newline at end of file
diff --git a/config/test_linux/application.conf b/config/test_linux/application.conf
new file mode 100644
index 0000000..e69de29
diff --git a/config/test_linux/log4j.properties b/config/test_linux/log4j.properties
new file mode 100644
index 0000000..117529f
--- /dev/null
+++ b/config/test_linux/log4j.properties
@@ -0,0 +1,12 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR, A1
+# If we get chained appenders, this stops the message being written multiple times
+log4j.additivity.org.apache=false
+log4j.additivity.xdasLogger=false
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
diff --git a/config/test_windows/application.conf b/config/test_windows/application.conf
new file mode 100644
index 0000000..e69de29
diff --git a/config/test_windows/log4j.properties b/config/test_windows/log4j.properties
new file mode 100644
index 0000000..117529f
--- /dev/null
+++ b/config/test_windows/log4j.properties
@@ -0,0 +1,12 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR, A1
+# If we get chained appenders, this stops the message being written multiple times
+log4j.additivity.org.apache=false
+log4j.additivity.xdasLogger=false
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
diff --git a/insight_data/patients.csv b/insight_data/patients.csv
new file mode 100644
index 0000000..65b0e29
--- /dev/null
+++ b/insight_data/patients.csv
@@ -0,0 +1,7 @@
+patientID,name ,address ,dateOfBirth,lastVisitDate
+1001 ,Homer Simpson ,"123 Blue St.,Los Angeles, CA 12345" ,1989-12-31 ,2017-01-21
+1002 ,Peter Griffin ,"234 Brown St., San Fransisco, CA 23456",1950-01-30 ,2015-04-18
+1003 ,Hubert J. Fansworth,"546 Red Dr., Sacramento, CA 54678" ,1978-08-21 ,2017-02-14
+1004 ,Marge Simpson ,"123 Blue St.,Los Angeles, CA 12345" ,1990-03-18 ,2016-02-15
+1005 ,Bender Rodriguez ,"127 Brown St., Charlotte, NC 28223" ,1986-12-31 ,2013-12-14
+1006 ,Turanga Leela ,"128 Brown St., Charlotte, NC 28223" ,1978-08-21 ,2012-09-15
diff --git a/insight_data/products.csv b/insight_data/products.csv
new file mode 100644
index 0000000..f535003
--- /dev/null
+++ b/insight_data/products.csv
@@ -0,0 +1,11 @@
+productID productCode name quantity price supplierid
+1001 PEN Pen Red 5000 1.23 501
+1002 PEN Pen Blue 8001 1.25 501
+1003 PEN Pen Black 2000 1.25 501
+1004 PEC Pencil 2B 10000 0.48 502
+1005 PEC Pencil 2H 8000 0.49 502
+1006 PEC Pencil HB 0 9999.99 502
+2001 PEC Pencil 3B 500 0.52 501
+2002 PEC Pencil 4B 200 0.62 501
+2003 PEC Pencil 5B 100 0.73 501
+2004 PEC Pencil 6B 500 0.47 502
diff --git a/insight_data/products_suppliers.csv b/insight_data/products_suppliers.csv
new file mode 100644
index 0000000..da4082b
--- /dev/null
+++ b/insight_data/products_suppliers.csv
@@ -0,0 +1,6 @@
+productID,supplierID
+2001 ,501
+2002 ,501
+2003 ,501
+2004 ,502
+2001 ,503
diff --git a/insight_data/supplier.csv b/insight_data/supplier.csv
new file mode 100644
index 0000000..5d2a943
--- /dev/null
+++ b/insight_data/supplier.csv
@@ -0,0 +1,6 @@
+supplierID,name ,phone
+501 ,ABC Traders,88881111
+502 ,XYZ Company,88882222
+503 ,QQ Corp ,88883333
+504 ,DEG LLC ,88884444
+505 ,FGH Limited,88885555
diff --git a/problem_scenarios/scala_problem_scenario_1-products.md b/problem_scenarios/scala_problem_scenario_1-products.md
new file mode 100644
index 0000000..4f2dc24
--- /dev/null
+++ b/problem_scenarios/scala_problem_scenario_1-products.md
@@ -0,0 +1,259 @@
+#Problem Scenario 1
+This problem has three parts:
+##
+1. [Part One](#part1)
+
+2. [Part Two](#part2)
+
+3. [Part Three](#part3)
+
+##Part One
+You have the following file [link to /insight_data/products.csv](#/insight_data/products.csv)
+
+```
+products.csv
+
++---------+-----------+---------+--------+-------+
+|productID|productCode| name|quantity| price|
++---------+-----------+---------+--------+-------+
+| 1001| PEN| Pen Red| 5000| 1.23|
+| 1002| PEN| Pen Blue| 8001| 1.25|
+| 1003| PEN|Pen Black| 2000| 1.25|
+| 1004| PEC|Pencil 2B| 10000| 0.48|
+| 1005| PEC|Pencil 2H| 8000| 0.49|
+| 1006| PEC|Pencil HB| 0|9999.99|
+| 2001| PEC|Pencil 3B| 500| 0.52|
+| 2002| PEC|Pencil 4B| 200| 0.62|
+| 2003| PEC|Pencil 5B| 100| 0.73|
+| 2004| PEC|Pencil 6B| 500| 0.47|
++---------+-----------+---------+--------+-------+
+```
+Using Spark and SparkSQL perform the following tasks:
+1. Load the csv file to a dataframe
+
+ Using schema:
+ ```
+ val schema =
+ StructType(
+ Array(
+ StructField("productID", IntegerType, false),
+ StructField("productCode", StringType, false),
+ StructField("name", StringType, false),
+ StructField("quantity", IntegerType, false),
+ StructField("price", FloatType, false)
+ )
+ )
+ ```
+ approach
+ ```
+ val productDF = sqlContext.read.format("com.databricks.spark.csv").option("delimiter","\t").option("header","true").option("inferSchema", "false").schema(schema).load(path)
+ ```
+
+2. Create a global temporary view named `products` from dataframe with data
+
+ `productDF.createGlobalTempView("products")`
+
+3. Using the global temporary view, perform the task below
+
+4. Select and show all the records with quantity >= 5000 and name starts with 'Pen'
+ ```
+ +---------+-----------+---------+--------+-----+
+ |productID|productCode| name|quantity|price|
+ +---------+-----------+---------+--------+-----+
+ | 1001| PEN| Pen Red| 5000| 1.23|
+ | 1002| PEN| Pen Blue| 8001| 1.25|
+ | 1004| PEC|Pencil 2B| 10000| 0.48|
+ | 1005| PEC|Pencil 2H| 8000| 0.49|
+ +---------+-----------+---------+--------+-----+
+ ```
+
+5. Select and show all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen'
+ ```
+ +---------+-----------+---------+--------+-----+
+ |productID|productCode| name|quantity|price|
+ +---------+-----------+---------+--------+-----+
+ | 1001| PEN| Pen Red| 5000| 1.23|
+ | 1004| PEC|Pencil 2B| 10000| 0.48|
+ | 1005| PEC|Pencil 2H| 8000| 0.49|
+ +---------+-----------+---------+--------+-----+
+ ```
+
+6. Select and show all the records witch does not have quantity 5000 and name does not starts with 'Pen'
+ ```
+ +---------+-----------+---------+--------+-------+
+ |productID|productCode| name|quantity| price|
+ +---------+-----------+---------+--------+-------+
+ | 1003| PEN|Pen Black| 2000| 1.25|
+ | 1006| PEC|Pencil HB| 0|9999.99|
+ | 2001| PEC|Pencil 3B| 500| 0.52|
+ | 2002| PEC|Pencil 4B| 200| 0.62|
+ | 2003| PEC|Pencil 5B| 100| 0.73|
+ | 2004| PEC|Pencil 6B| 500| 0.47|
+ +---------+-----------+---------+--------+-------+
+ ```
+
+7. Select and show all the products which name is 'Pen Red', 'Pen Black'
+ ```
+ +---------+-----------+---------+--------+-----+
+ |productID|productCode| name|quantity|price|
+ +---------+-----------+---------+--------+-----+
+ | 1001| PEN| Pen Red| 5000| 1.23|
+ | 1003| PEN|Pen Black| 2000| 1.25|
+ +---------+-----------+---------+--------+-----+
+ ```
+8. Select and show all the products which has price BETWEEN 1.0 AND 2 0 AND quantity
+ ```
+ +---------+-----------+---------+--------+-----+
+ |productID|productCode| name|quantity|price|
+ +---------+-----------+---------+--------+-----+
+ | 1003| PEN|Pen Black| 2000| 1.25|
+ +---------+-----------+---------+--------+-----+
+ ```
+
+9. Select all the products which has product code as null
+
+ ```
+ +---------+-----------+----+--------+-----+
+ |productID|productCode|name|quantity|price|
+ +---------+-----------+----+--------+-----+
+ +---------+-----------+----+--------+-----+
+ ```
+
+10. Select all the products, whose name stalls with Pen and results should be order by Price descending order.
+
+ ```
+ +---------+-----------+---------+--------+-------+
+ |productID|productCode| name|quantity| price|
+ +---------+-----------+---------+--------+-------+
+ | 1006| PEC|Pencil HB| 0|9999.99|
+ | 1003| PEN|Pen Black| 2000| 1.25|
+ | 1002| PEN| Pen Blue| 8001| 1.25|
+ | 1001| PEN| Pen Red| 5000| 1.23|
+ | 2003| PEC|Pencil 5B| 100| 0.73|
+ | 2002| PEC|Pencil 4B| 200| 0.62|
+ | 2001| PEC|Pencil 3B| 500| 0.52|
+ | 1005| PEC|Pencil 2H| 8000| 0.49|
+ | 1004| PEC|Pencil 2B| 10000| 0.48|
+ | 2004| PEC|Pencil 6B| 500| 0.47|
+ +---------+-----------+---------+--------+-------+
+ ```
+
+11. Select all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order.
+
+ ```
+ +---------+-----------+---------+--------+-------+
+ |productID|productCode| name|quantity| price|
+ +---------+-----------+---------+--------+-------+
+ | 1006| PEC|Pencil HB| 0|9999.99|
+ | 1003| PEN|Pen Black| 2000| 1.25|
+ | 1002| PEN| Pen Blue| 8001| 1.25|
+ | 1001| PEN| Pen Red| 5000| 1.23|
+ | 2003| PEC|Pencil 5B| 100| 0.73|
+ | 2002| PEC|Pencil 4B| 200| 0.62|
+ | 2001| PEC|Pencil 3B| 500| 0.52|
+ | 1005| PEC|Pencil 2H| 8000| 0.49|
+ | 1004| PEC|Pencil 2B| 10000| 0.48|
+ | 2004| PEC|Pencil 6B| 500| 0.47|
+ +---------+-----------+---------+--------+-------+
+ ```
+
+12. Select top 2 products by price
+
+ ```
+ +---------+-----------+---------+--------+-------+
+ |productID|productCode| name|quantity| price|
+ +---------+-----------+---------+--------+-------+
+ | 1006| PEC|Pencil HB| 0|9999.99|
+ | 1002| PEN| Pen Blue| 8001| 1.25|
+ +---------+-----------+---------+--------+-------+
+ ```
+
+
+13. Select all the columns from product table with output header as below.
+ `
+ productID AS ID
+ code AS Code
+ name AS Description
+ price AS 'Unit Price'
+ `
+ ```
+
+ ```
+
+14. Select code and name both separated by '-' and header name should be ProductDescription'_
+
+ ```
+ ```
+
+15. Select all distinct prices.
+
+ ```
+ ```
+
+16. Select distinct price and name combination
+
+ ```
+ ```
+
+17. Select all price data sorted by both code and productID combinatiom
+
+ ```
+ ```
+
+18. count number of products.
+
+ ```
+ ```
+
+19. Count number ofproducts for each code
+
+ ```
+ ```
+
+
+20. Select Maximum, minimum, average Standard Deviation, and total quantity _
+
+21. Select minimum and maximum price for each product code.
+
+22. Select Maximum, minimum, average Standard Deviation, and total quantity for each product code, hwoeiM make sure and Standard deviation will have maximum two decimal values.
+
+23. Select all the product code and average price only where product count is more than or equal to 3
+
+24. Select maximum, minimum average and total of all the products for each code. Also produce the same across all the products
+
+
+##Part Two
+You have been provided two additional files:
+
+1. [link to insights_data/products_suppliers.csv]()
+
+2. [link to insights_data/products.csv]()
+
+
+Now accomplish all the queries.
+
+1. Select product, its price , its supplier name where product price is less than 0.6 using SparkSQL
+
+
+2. It is possible that, same product can be supplied by multiple supplier. Now find each product, its price according to
+each supplier.
+
+3. Find all the supllier name, who are supplying 'Pencil 3B'
+
+4. Find all the products , which are supplied by ABC Traders _
+
+
+##Part Three
+1. Create a Hive ORC table using SparkSQL
+
+2. Load this data in Hive table.
+
+3. Create a Hive parquet table using SparkSQL and load data in it.
+
+
+##Developer Notes:
+Add the following VM options arguments to set spark master
+```
+-Dspark.master=local
+-Dhadoop.home.dir=C:\hadoop-2.7.4
+```
diff --git a/problem_scenarios/scala_problem_scenario_2-patient_data.md b/problem_scenarios/scala_problem_scenario_2-patient_data.md
new file mode 100644
index 0000000..719a2d1
--- /dev/null
+++ b/problem_scenarios/scala_problem_scenario_2-patient_data.md
@@ -0,0 +1,18 @@
+#Problem Scenario 3
+You have been given the following file containing patient data:
+[link to /insight_data/patients.csv]
+
+Accomplish the following activities:
+
+1. Find all the patients whose lastVisitDate between current time and '2012-09-15'
+
+2. Find all the patients who bom in 1990
+
+3. Find all the patients age
+
+4. List patients uhose last visited more than 60 days ago
+
+5. Select patients 18 years old or younger
+
+
+
diff --git a/problem_scenarios/scala_problem_scenario_rdd_join.md b/problem_scenarios/scala_problem_scenario_rdd_join.md
new file mode 100644
index 0000000..0e89871
--- /dev/null
+++ b/problem_scenarios/scala_problem_scenario_rdd_join.md
@@ -0,0 +1,25 @@
+#Problem Scenario: RDD join
+
+You have been given below code snippet
+
+```
+val a = "salmon", "salmon", "rat", "elephant"}, 3}
+
+val b = a.keyBy(_.1ength)
+
+val c = sc.parallelize(List("dog", "cat", "gnu", "salmon", )), "salmon" , 3)
+
+val d = c.keyBy(_.1ength)
+
+```
+
+Write a correct code snippet for operationl which will produce desired output, shoun below.
+
+```
+Array[(lnt, (String, String))] =
+
+(6 , (salmon,rabbit))
+
+(3,(dog,dog)), (3 ,(dog,cat)), (3 ,(dog,gnu)),
+```
+
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..7d2ba3f
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1,2 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
+resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
\ No newline at end of file
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15
diff --git a/scripts/create_products_parquet_table_in_hive.sh b/scripts/create_products_parquet_table_in_hive.sh
new file mode 100644
index 0000000..b164280
--- /dev/null
+++ b/scripts/create_products_parquet_table_in_hive.sh
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+
+hive -e
+"
+CREATE EXTERNAL TABLE products_orc (productid int, code string, name string, quantity int, price float)
+STORED AS parquet
+LOCATION /user/hive/warehouse/product_parquet_table
+;"
+
+#"select * from product_parquet_table;"
\ No newline at end of file
diff --git a/scripts/create_products_table_in_hive.sh b/scripts/create_products_table_in_hive.sh
new file mode 100644
index 0000000..b28e0e0
--- /dev/null
+++ b/scripts/create_products_table_in_hive.sh
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+
+hive -e
+"
+CREATE EXTERNAL TABLE products_orc (productid int, code string, name string, quantity int, price float)
+STORED AS orc
+LOCATION /user/hive/warehouse/product_orc_table
+;"
+
+#select * from product_orc_table
\ No newline at end of file
diff --git a/spark-warehouse/product_orc_table/part-00000-6c329148-858e-4742-81f4-b40760a34acf.snappy.orc b/spark-warehouse/product_orc_table/part-00000-6c329148-858e-4742-81f4-b40760a34acf.snappy.orc
new file mode 100644
index 0000000..66e265b
Binary files /dev/null and b/spark-warehouse/product_orc_table/part-00000-6c329148-858e-4742-81f4-b40760a34acf.snappy.orc differ
diff --git a/spark-warehouse/product_parquet_table/part-00000-fa21c8aa-ec5b-42b9-be4b-fa3930679e14.snappy.parquet b/spark-warehouse/product_parquet_table/part-00000-fa21c8aa-ec5b-42b9-be4b-fa3930679e14.snappy.parquet
new file mode 100644
index 0000000..b1d6f1b
Binary files /dev/null and b/spark-warehouse/product_parquet_table/part-00000-fa21c8aa-ec5b-42b9-be4b-fa3930679e14.snappy.parquet differ
diff --git a/src/main/scala/com/jwk/development/big_data_insights/scala/products/driver/run_problem_scenario_one.scala b/src/main/scala/com/jwk/development/big_data_insights/scala/products/driver/run_problem_scenario_one.scala
new file mode 100644
index 0000000..fe3bc86
--- /dev/null
+++ b/src/main/scala/com/jwk/development/big_data_insights/scala/products/driver/run_problem_scenario_one.scala
@@ -0,0 +1,24 @@
+package com.jwk.development.big_data_insights.scala.products.driver
+
+import java.util.Date
+
+import com.jwk.development.big_data_insights.scala.products.problem_scenario_One
+import org.apache.spark.sql.SparkSession
+
+object run_problem_scenario_one {
+ val spark: SparkSession = SparkSession.builder.getOrCreate()
+
+ def main(args: Array[String]): Unit = {
+
+ println("Start " + this.getClass.getName() + " : " + new Date())
+ try {
+ val solution = new problem_scenario_One
+ solution.problem_scenario1("insight_data/products.csv")
+ } catch {
+ case ex: Exception => {
+ println(this.getClass.getName() + ". Error during transformation. Root cause: " + ex.getMessage())
+ }
+ }
+ println("End " + this.getClass.getName() + " : " + new Date())
+ }
+}
diff --git a/src/main/scala/com/jwk/development/big_data_insights/scala/products/problem_scenario_One.scala b/src/main/scala/com/jwk/development/big_data_insights/scala/products/problem_scenario_One.scala
new file mode 100644
index 0000000..afec91e
--- /dev/null
+++ b/src/main/scala/com/jwk/development/big_data_insights/scala/products/problem_scenario_One.scala
@@ -0,0 +1,160 @@
+package com.jwk.development.big_data_insights.scala.products
+
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+class problem_scenario_One {
+ val spark: SparkSession = SparkSession.builder.appName("products_application").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()
+ val sparkContext = spark.sparkContext
+ val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
+
+
+ def problem_scenario1(path: String): Unit = {
+ //val tab_delimited_Header= "productID\tproductCode\tname\tquantity\tprice\tsupplierid"
+ //val comma_delimited_Header= "productID,productCode,name,quantity,price,supplierid"
+
+ //define schema of csv file
+ val schema =
+ StructType(
+ Array(
+ StructField("productID", IntegerType, false),
+ StructField("productCode", StringType, false),
+ StructField("name", StringType, false),
+ StructField("quantity", IntegerType, false),
+ StructField("price", FloatType, false)
+ )
+ )
+
+ //read csv file from directory path using schema
+ val productDF = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", "\t").option("header", "true").option("inferSchema", "false").schema(schema).load(path)
+ //show first 10 records in the dataframe
+ productDF.show(10)
+
+ // Register the DataFrame as a global temporary view
+ val tempTableName = "products"
+ productDF.createGlobalTempView(tempTableName)
+ val globalTempViewName = s"global_temp.$tempTableName"
+
+ //import apache spark sql
+ import org.apache.spark.sql._
+
+ //The following answers PART ONE questions of the problem scenario.
+ //1. Select all the records with quantity >= 5000 and name starts with 'Pen'
+ println("SELECTING: all the records with quantity >= 5000 and name starts with 'Pen'")
+ val results1 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE quantity >= 5000 AND name LIKE 'Pen%'")
+ println("SHOWING: all the records with quantity >= 5000 and name starts with 'Pen'")
+ results1.show()
+
+ //2. Select all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen'
+ println("SELECTING: all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen'")
+ val results2 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE quantity >= 5000 AND price < 1.24 AND name LIKE 'Pen%'")
+ println("SHOWING: all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen'")
+ results2.show()
+
+ //3. Select all the records witch does not have quantity 5000 and name does not starts with 'Pen'
+ println("SELECTING: all the records witch does not have quantity 5000 and name does not starts with 'Pen'")
+ val results3 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE NOT (quantity >= 5000 AND name LIKE 'Pen%')")
+ println("SHOWING: all the records witch does not have quantity 5000 and name does not starts with 'Pen'")
+ results3.show()
+
+ //4. Select all the products which name is 'Pen Red', 'Pen Black'
+ println("SELECTING: all the products which name is 'Pen Red', 'Pen Black'")
+ val results4 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE name IN ('Pen Red', 'Pen Black')")
+ println("SHOWING: all the products which name is 'Pen Red', 'Pen Black'")
+ results4.show()
+
+ //5. Select all the products which has price BETWEEN 1.0 AND 2.0 AND quantity BETWEEN 1000 AND 2000
+ println("SELECTING : all the products which has price BETWEEN 1.0 AND 2.0 AND quantity BETWEEN 1000 AND 2000")
+ val results5 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE (price BETWEEN 1 AND 2) AND (quantity BETWEEN 1000 AND 2000)")
+ println("SHOWING: all the products which has price BETWEEN 1.0 AND 2.0 AND quantity BETWEEN 1000 AND 2000")
+ results5.show()
+
+ //Select all the products which has product code as null
+ println("SELECTING : all the products which has product code as null")
+ val results6 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE productCode IS NULL")
+ println("SHOWING: all the products which has product code as null")
+ results6.show()
+
+ //Select all the products, whose name starts with Pen and results should be order by Price descending order.
+ println("SELECTING : all the products, whose name stalls with Pen and results should be order by Price descending order.")
+ val results7 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE name LIKE 'Pen%' ORDER BY price DESC")
+ println("SHOWING: all the products, whose name startss with Pen and results should be order by Price descending order.")
+ results7.show()
+
+ //Select all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order.
+ println("SELECTING : all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order")
+ val results8 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE name LIKE 'Pen%' ORDER BY price DESC, quantity ASC")
+ println("SHOWING: all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order")
+ results8.show()
+
+ //Select top 2 products by price
+ println("SELECTING : top 2 products by price")
+ val results9 = spark.sql(s"SELECT * FROM $globalTempViewName ORDER BY price DESC LIMIT 2")
+ println("SHOWING: top 2 products by price")
+ results9.show()
+
+ //Select all the columns from product table with output header as below.: `productID AS ID code AS Code name AS Description price AS 'Unit Price'
+ println("SELECTING : all the columns from product table with output header as below.: `productID AS ID code AS Code name AS Description price AS 'Unit Price'")
+ val results10 = spark.sql(s"SELECT productID AS ID, productCode AS Code, name AS Description, price AS Unit_Price FROM $globalTempViewName")
+ println("SHOWING: all the columns from product table with output header as below.: `productID AS ID code AS Code name AS Description price AS 'Unit Pric'")
+ results10.show()
+
+ //Select code and name both separated by - and header name should be ProductDescription'
+ println("SELECTING : code and name both separated by ' and header name should be ProductDescription'")
+ val results11 = spark.sql(s"SELECT CONCAT(productID,'-',name) AS ProductDescription FROM $globalTempViewName")
+ println("SHOWING: code and name both separated by ' and header name should be ProductDescription'")
+ results11.show()
+
+ //Select all distinct prices
+ println("SELECTING : all distinct prices")
+ val results12 = spark.sql(s"SELECT DISTINCT price AS Distinct_Price FROM $globalTempViewName")
+ println("SHOWING: all distinct prices")
+ results12.show()
+
+ //Select distinct price and name combination
+ println("SELECTING : distinct price and name combination")
+ val results13 = spark.sql(s"SELECT DISTINCT price, name FROM $globalTempViewName")
+ println("SHOWING: distinct price and name combination")
+ results13.show()
+
+ //Select all price data sorted by both code and productID combination
+ println("SELECTING : all price data sorted by both code and productID combinatiom")
+ val results15 = spark.sql(s"SELECT * FROM $globalTempViewName ORDER BY productID, productID")
+ println("SHOWING: all price data sorted by both code and productID combinatiom")
+ results15.show()
+
+
+ //Count number ofproducts for each code
+ println("SELECTING : Count number of products for each code")
+ val results16 = spark.sql(s"SELECT * FROM $globalTempViewName ORDER BY price DESC LIMIT 2")
+ println("SHOWING: Count number ofproducts for each code")
+ results16.show()
+
+ //save daraframe to hive table in orc format
+ writeDataFrameToHiveTable(productDF, SaveMode.Overwrite, "orc", "product_orc_table")
+
+ //save daraframe to hive table in orc format
+ writeDataFrameToHiveTable(productDF, SaveMode.Overwrite, "parquet", "product_parquet_table")
+
+ }
+
+ def writeDataFrameToHiveTable(inputDF: DataFrame, saveMode: SaveMode, dataFormat: String, hiveTableName: String) = {
+ println(s"Starting to write dataframe to hive table with the following data format $dataFormat and hive table name: $hiveTableName")
+ //match cases: json, parquet, jdbc, orc, libsvm, csv, text
+ dataFormat match {
+ case "json" => inputDF.write.mode(saveMode).format("json").saveAsTable(hiveTableName)
+ case "parquet" => inputDF.write.mode(saveMode).format("parquet").saveAsTable(hiveTableName)
+ case "jdbc" => inputDF.write.mode(saveMode).format("jdbc").saveAsTable(hiveTableName)
+ case "orc" => inputDF.write.mode(saveMode).format("orc").saveAsTable(hiveTableName)
+ case "csv" => inputDF.write.mode(saveMode).format("libsvm").saveAsTable(hiveTableName)
+ case "text" => inputDF.write.mode(saveMode).format("text").saveAsTable(hiveTableName)
+ case "libsvm" => inputDF.write.mode(saveMode).format("libsvm").saveAsTable(hiveTableName)
+ case _ => "Invalid dataFormat. Allowed formats are: json, parquet, jdbc, orc, csv, text or libsvm" // the default, catch-all
+ }
+
+ println(s"End write dataframe to hive table with the following table name $dataFormat and hive table name: $hiveTableName")
+
+ }
+
+
+}