From cc3678a5d9c971e5029d61cbeeda61ccab15052f Mon Sep 17 00:00:00 2001 From: "James W. Kimani" Date: Sat, 10 Feb 2018 16:22:17 -0500 Subject: [PATCH] repo restructure Signed-off-by: James W. Kimani --- .gitignore | 60 ++++ README.md | 53 +++- build.sbt | 34 +++ config/test_linux/application.conf | 0 config/test_linux/log4j.properties | 12 + config/test_windows/application.conf | 0 config/test_windows/log4j.properties | 12 + insight_data/patients.csv | 7 + insight_data/products.csv | 11 + insight_data/products_suppliers.csv | 6 + insight_data/supplier.csv | 6 + .../scala_problem_scenario_1-products.md | 259 ++++++++++++++++++ .../scala_problem_scenario_2-patient_data.md | 18 ++ .../scala_problem_scenario_rdd_join.md | 25 ++ project/assembly.sbt | 2 + project/build.properties | 1 + .../create_products_parquet_table_in_hive.sh | 10 + scripts/create_products_table_in_hive.sh | 10 + ...148-858e-4742-81f4-b40760a34acf.snappy.orc | Bin 0 -> 904 bytes ...ec5b-42b9-be4b-fa3930679e14.snappy.parquet | Bin 0 -> 1308 bytes .../driver/run_problem_scenario_one.scala | 24 ++ .../scala/products/problem_scenario_One.scala | 160 +++++++++++ 22 files changed, 709 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 build.sbt create mode 100644 config/test_linux/application.conf create mode 100644 config/test_linux/log4j.properties create mode 100644 config/test_windows/application.conf create mode 100644 config/test_windows/log4j.properties create mode 100644 insight_data/patients.csv create mode 100644 insight_data/products.csv create mode 100644 insight_data/products_suppliers.csv create mode 100644 insight_data/supplier.csv create mode 100644 problem_scenarios/scala_problem_scenario_1-products.md create mode 100644 problem_scenarios/scala_problem_scenario_2-patient_data.md create mode 100644 problem_scenarios/scala_problem_scenario_rdd_join.md create mode 100644 project/assembly.sbt create mode 100644 project/build.properties create mode 100644 scripts/create_products_parquet_table_in_hive.sh create mode 100644 scripts/create_products_table_in_hive.sh create mode 100644 spark-warehouse/product_orc_table/part-00000-6c329148-858e-4742-81f4-b40760a34acf.snappy.orc create mode 100644 spark-warehouse/product_parquet_table/part-00000-fa21c8aa-ec5b-42b9-be4b-fa3930679e14.snappy.parquet create mode 100644 src/main/scala/com/jwk/development/big_data_insights/scala/products/driver/run_problem_scenario_one.scala create mode 100644 src/main/scala/com/jwk/development/big_data_insights/scala/products/problem_scenario_One.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..613d4a3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,60 @@ +RemoteSystemsTempFiles/ +Servers/ +target/ +logs/ +.metadata/ +bin/ +tmp/ +*.tmp +*.bak +*.swp +*~.nib +local.properties +.settings/ +.loadpath +.recommenders +.idea/ +.project +classes/ +.classpath +.iml +*_SUCCESS* +*.crc + +# External tool builders +.externalToolBuilders/ + +# Locally stored "Eclipse launch configurations" +*.launch + +# PyDev specific (Python IDE for Eclipse) +*.pydevproject + +# CDT-specific (C/C++ Development Tooling) +.cproject + +# Java annotation processor (APT) +.factorypath + +# PDT-specific (PHP Development Tools) +.buildpath + +# sbteclipse plugin +.target + +# Tern plugin +.tern-project + +# TeXlipse plugin +.texlipse + +# STS (Spring Tool Suite) +.springBeans + +# Code Recommenders +.recommenders/ + +# Scala IDE specific (Scala & Java development for Eclipse) +.cache-main +.scala_dependencies +.worksheet diff --git a/README.md b/README.md index a4d0f2a..56e550a 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,53 @@ # big-data-insights-scala -personal solutions to big data problem scenarios using scala +personal solutions to big data problem scenarios using scala + +## Project Structure +Each package is based on a problem scenario. + +Each problem scenario will contain a main class in the *com.jwk.development.big_data_insights.scala.products.driver* package + +Each problem contains a problem scenario detail and result sheet. + +### 1. Product Data for a pen company + + Problem: Given csv files with product information from a pen company, provide some insights using big data technologies + + Package name: *com.jwk.development.big_data_insights.scala.products.problem_scenario_One* + + Driver/Main class: *com.jwk.development.big_data_insights.scala.products.driver.run_problem_scenario_one* + + Link to result sheet and detailed problem scenarions: + + [Part One]() + + [Part Two]() + + [Part Three]() + +### 2. Patient Data + + Problem: ** + + Package name: ** + + Driver/Main class: ** + + Link to result sheet and detailed problem scenarions: + + +## Troubleshooting +1. When running applications if below error occurs: *A master URL must be set in your configuration* + ``` + Exception in thread "main" java.lang.ExceptionInInitializerError + at com.jwk.development.big_data_insights.scala.products.driver.problem_scenario_1.main(problem_scenario_1.scala) + Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration + ``` + + Solution: + + Add the following VM option to your run configurations + ``` + -Dspark.master=local + ``` + [link to setting spark master to local in intellij]() + diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..a567064 --- /dev/null +++ b/build.sbt @@ -0,0 +1,34 @@ +name := "big-data-insights-scala" + +version := "1.0" + +scalaVersion := "2.11.8" + +libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-client" % "2.7.3", + ("org.apache.spark" % "spark-core_2.11" % "2.1.0"), + ("org.apache.spark" % "spark-sql_2.11" % "2.1.0"), + "org.apache.spark" % "spark-hive_2.11" % "2.1.0", + "com.databricks" % "spark-avro_2.11" % "3.2.0", + "com.databricks" % "spark-csv_2.10" % "1.3.0", + "org.scala-lang" % "scala-library" % "2.11.8", + "org.scala-lang" % "scala-reflect" % "2.11.8", + "com.typesafe" % "config" % "1.3.1", + "org.apache.logging.log4j" %% "log4j-api-scala" % "2.8.1", + "org.apache.logging.log4j" % "log4j-core" % "2.8.1", + "org.apache.kafka" %% "kafka" % "0.9.0.2.3.4.51-1" + +) +//use external repositories +resolvers += "HortonWorksRepo" at "http://repo.hortonworks.com/content/repositories/releases/" + +parallelExecution in test := false + + +initialCommands := "import org.test._" + +//clean operations +cleanFiles += baseDirectory { base => base / "build" }.value +cleanFiles += baseDirectory { base => base / "metastore_db" }.value + +//assembly-settings \ No newline at end of file diff --git a/config/test_linux/application.conf b/config/test_linux/application.conf new file mode 100644 index 0000000..e69de29 diff --git a/config/test_linux/log4j.properties b/config/test_linux/log4j.properties new file mode 100644 index 0000000..117529f --- /dev/null +++ b/config/test_linux/log4j.properties @@ -0,0 +1,12 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=ERROR, A1 +# If we get chained appenders, this stops the message being written multiple times +log4j.additivity.org.apache=false +log4j.additivity.xdasLogger=false +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + diff --git a/config/test_windows/application.conf b/config/test_windows/application.conf new file mode 100644 index 0000000..e69de29 diff --git a/config/test_windows/log4j.properties b/config/test_windows/log4j.properties new file mode 100644 index 0000000..117529f --- /dev/null +++ b/config/test_windows/log4j.properties @@ -0,0 +1,12 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=ERROR, A1 +# If we get chained appenders, this stops the message being written multiple times +log4j.additivity.org.apache=false +log4j.additivity.xdasLogger=false +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + diff --git a/insight_data/patients.csv b/insight_data/patients.csv new file mode 100644 index 0000000..65b0e29 --- /dev/null +++ b/insight_data/patients.csv @@ -0,0 +1,7 @@ +patientID,name ,address ,dateOfBirth,lastVisitDate +1001 ,Homer Simpson ,"123 Blue St.,Los Angeles, CA 12345" ,1989-12-31 ,2017-01-21 +1002 ,Peter Griffin ,"234 Brown St., San Fransisco, CA 23456",1950-01-30 ,2015-04-18 +1003 ,Hubert J. Fansworth,"546 Red Dr., Sacramento, CA 54678" ,1978-08-21 ,2017-02-14 +1004 ,Marge Simpson ,"123 Blue St.,Los Angeles, CA 12345" ,1990-03-18 ,2016-02-15 +1005 ,Bender Rodriguez ,"127 Brown St., Charlotte, NC 28223" ,1986-12-31 ,2013-12-14 +1006 ,Turanga Leela ,"128 Brown St., Charlotte, NC 28223" ,1978-08-21 ,2012-09-15 diff --git a/insight_data/products.csv b/insight_data/products.csv new file mode 100644 index 0000000..f535003 --- /dev/null +++ b/insight_data/products.csv @@ -0,0 +1,11 @@ +productID productCode name quantity price supplierid +1001 PEN Pen Red 5000 1.23 501 +1002 PEN Pen Blue 8001 1.25 501 +1003 PEN Pen Black 2000 1.25 501 +1004 PEC Pencil 2B 10000 0.48 502 +1005 PEC Pencil 2H 8000 0.49 502 +1006 PEC Pencil HB 0 9999.99 502 +2001 PEC Pencil 3B 500 0.52 501 +2002 PEC Pencil 4B 200 0.62 501 +2003 PEC Pencil 5B 100 0.73 501 +2004 PEC Pencil 6B 500 0.47 502 diff --git a/insight_data/products_suppliers.csv b/insight_data/products_suppliers.csv new file mode 100644 index 0000000..da4082b --- /dev/null +++ b/insight_data/products_suppliers.csv @@ -0,0 +1,6 @@ +productID,supplierID +2001 ,501 +2002 ,501 +2003 ,501 +2004 ,502 +2001 ,503 diff --git a/insight_data/supplier.csv b/insight_data/supplier.csv new file mode 100644 index 0000000..5d2a943 --- /dev/null +++ b/insight_data/supplier.csv @@ -0,0 +1,6 @@ +supplierID,name ,phone +501 ,ABC Traders,88881111 +502 ,XYZ Company,88882222 +503 ,QQ Corp ,88883333 +504 ,DEG LLC ,88884444 +505 ,FGH Limited,88885555 diff --git a/problem_scenarios/scala_problem_scenario_1-products.md b/problem_scenarios/scala_problem_scenario_1-products.md new file mode 100644 index 0000000..4f2dc24 --- /dev/null +++ b/problem_scenarios/scala_problem_scenario_1-products.md @@ -0,0 +1,259 @@ +#Problem Scenario 1 +This problem has three parts: +## +1. [Part One](#part1) + +2. [Part Two](#part2) + +3. [Part Three](#part3) + +##Part One +You have the following file [link to /insight_data/products.csv](#/insight_data/products.csv) + +``` +products.csv + ++---------+-----------+---------+--------+-------+ +|productID|productCode| name|quantity| price| ++---------+-----------+---------+--------+-------+ +| 1001| PEN| Pen Red| 5000| 1.23| +| 1002| PEN| Pen Blue| 8001| 1.25| +| 1003| PEN|Pen Black| 2000| 1.25| +| 1004| PEC|Pencil 2B| 10000| 0.48| +| 1005| PEC|Pencil 2H| 8000| 0.49| +| 1006| PEC|Pencil HB| 0|9999.99| +| 2001| PEC|Pencil 3B| 500| 0.52| +| 2002| PEC|Pencil 4B| 200| 0.62| +| 2003| PEC|Pencil 5B| 100| 0.73| +| 2004| PEC|Pencil 6B| 500| 0.47| ++---------+-----------+---------+--------+-------+ +``` +Using Spark and SparkSQL perform the following tasks: +1. Load the csv file to a dataframe + + Using schema: + ``` + val schema = + StructType( + Array( + StructField("productID", IntegerType, false), + StructField("productCode", StringType, false), + StructField("name", StringType, false), + StructField("quantity", IntegerType, false), + StructField("price", FloatType, false) + ) + ) + ``` + approach + ``` + val productDF = sqlContext.read.format("com.databricks.spark.csv").option("delimiter","\t").option("header","true").option("inferSchema", "false").schema(schema).load(path) + ``` + +2. Create a global temporary view named `products` from dataframe with data + + `productDF.createGlobalTempView("products")` + +3. Using the global temporary view, perform the task below + +4. Select and show all the records with quantity >= 5000 and name starts with 'Pen' + ``` + +---------+-----------+---------+--------+-----+ + |productID|productCode| name|quantity|price| + +---------+-----------+---------+--------+-----+ + | 1001| PEN| Pen Red| 5000| 1.23| + | 1002| PEN| Pen Blue| 8001| 1.25| + | 1004| PEC|Pencil 2B| 10000| 0.48| + | 1005| PEC|Pencil 2H| 8000| 0.49| + +---------+-----------+---------+--------+-----+ + ``` + +5. Select and show all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen' + ``` + +---------+-----------+---------+--------+-----+ + |productID|productCode| name|quantity|price| + +---------+-----------+---------+--------+-----+ + | 1001| PEN| Pen Red| 5000| 1.23| + | 1004| PEC|Pencil 2B| 10000| 0.48| + | 1005| PEC|Pencil 2H| 8000| 0.49| + +---------+-----------+---------+--------+-----+ + ``` + +6. Select and show all the records witch does not have quantity 5000 and name does not starts with 'Pen' + ``` + +---------+-----------+---------+--------+-------+ + |productID|productCode| name|quantity| price| + +---------+-----------+---------+--------+-------+ + | 1003| PEN|Pen Black| 2000| 1.25| + | 1006| PEC|Pencil HB| 0|9999.99| + | 2001| PEC|Pencil 3B| 500| 0.52| + | 2002| PEC|Pencil 4B| 200| 0.62| + | 2003| PEC|Pencil 5B| 100| 0.73| + | 2004| PEC|Pencil 6B| 500| 0.47| + +---------+-----------+---------+--------+-------+ + ``` + +7. Select and show all the products which name is 'Pen Red', 'Pen Black' + ``` + +---------+-----------+---------+--------+-----+ + |productID|productCode| name|quantity|price| + +---------+-----------+---------+--------+-----+ + | 1001| PEN| Pen Red| 5000| 1.23| + | 1003| PEN|Pen Black| 2000| 1.25| + +---------+-----------+---------+--------+-----+ + ``` +8. Select and show all the products which has price BETWEEN 1.0 AND 2 0 AND quantity + ``` + +---------+-----------+---------+--------+-----+ + |productID|productCode| name|quantity|price| + +---------+-----------+---------+--------+-----+ + | 1003| PEN|Pen Black| 2000| 1.25| + +---------+-----------+---------+--------+-----+ + ``` + +9. Select all the products which has product code as null + + ``` + +---------+-----------+----+--------+-----+ + |productID|productCode|name|quantity|price| + +---------+-----------+----+--------+-----+ + +---------+-----------+----+--------+-----+ + ``` + +10. Select all the products, whose name stalls with Pen and results should be order by Price descending order. + + ``` + +---------+-----------+---------+--------+-------+ + |productID|productCode| name|quantity| price| + +---------+-----------+---------+--------+-------+ + | 1006| PEC|Pencil HB| 0|9999.99| + | 1003| PEN|Pen Black| 2000| 1.25| + | 1002| PEN| Pen Blue| 8001| 1.25| + | 1001| PEN| Pen Red| 5000| 1.23| + | 2003| PEC|Pencil 5B| 100| 0.73| + | 2002| PEC|Pencil 4B| 200| 0.62| + | 2001| PEC|Pencil 3B| 500| 0.52| + | 1005| PEC|Pencil 2H| 8000| 0.49| + | 1004| PEC|Pencil 2B| 10000| 0.48| + | 2004| PEC|Pencil 6B| 500| 0.47| + +---------+-----------+---------+--------+-------+ + ``` + +11. Select all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order. + + ``` + +---------+-----------+---------+--------+-------+ + |productID|productCode| name|quantity| price| + +---------+-----------+---------+--------+-------+ + | 1006| PEC|Pencil HB| 0|9999.99| + | 1003| PEN|Pen Black| 2000| 1.25| + | 1002| PEN| Pen Blue| 8001| 1.25| + | 1001| PEN| Pen Red| 5000| 1.23| + | 2003| PEC|Pencil 5B| 100| 0.73| + | 2002| PEC|Pencil 4B| 200| 0.62| + | 2001| PEC|Pencil 3B| 500| 0.52| + | 1005| PEC|Pencil 2H| 8000| 0.49| + | 1004| PEC|Pencil 2B| 10000| 0.48| + | 2004| PEC|Pencil 6B| 500| 0.47| + +---------+-----------+---------+--------+-------+ + ``` + +12. Select top 2 products by price + + ``` + +---------+-----------+---------+--------+-------+ + |productID|productCode| name|quantity| price| + +---------+-----------+---------+--------+-------+ + | 1006| PEC|Pencil HB| 0|9999.99| + | 1002| PEN| Pen Blue| 8001| 1.25| + +---------+-----------+---------+--------+-------+ + ``` + + +13. Select all the columns from product table with output header as below. + ` + productID AS ID + code AS Code + name AS Description + price AS 'Unit Price' + ` + ``` + + ``` + +14. Select code and name both separated by '-' and header name should be ProductDescription'_ + + ``` + ``` + +15. Select all distinct prices. + + ``` + ``` + +16. Select distinct price and name combination + + ``` + ``` + +17. Select all price data sorted by both code and productID combinatiom + + ``` + ``` + +18. count number of products. + + ``` + ``` + +19. Count number ofproducts for each code + + ``` + ``` + + +20. Select Maximum, minimum, average Standard Deviation, and total quantity _ + +21. Select minimum and maximum price for each product code. + +22. Select Maximum, minimum, average Standard Deviation, and total quantity for each product code, hwoeiM make sure and Standard deviation will have maximum two decimal values. + +23. Select all the product code and average price only where product count is more than or equal to 3 + +24. Select maximum, minimum average and total of all the products for each code. Also produce the same across all the products + + +##Part Two +You have been provided two additional files: + +1. [link to insights_data/products_suppliers.csv]() + +2. [link to insights_data/products.csv]() + + +Now accomplish all the queries. + +1. Select product, its price , its supplier name where product price is less than 0.6 using SparkSQL + + +2. It is possible that, same product can be supplied by multiple supplier. Now find each product, its price according to +each supplier. + +3. Find all the supllier name, who are supplying 'Pencil 3B' + +4. Find all the products , which are supplied by ABC Traders _ + + +##Part Three +1. Create a Hive ORC table using SparkSQL + +2. Load this data in Hive table. + +3. Create a Hive parquet table using SparkSQL and load data in it. + + +##Developer Notes: +Add the following VM options arguments to set spark master +``` +-Dspark.master=local +-Dhadoop.home.dir=C:\hadoop-2.7.4 +``` diff --git a/problem_scenarios/scala_problem_scenario_2-patient_data.md b/problem_scenarios/scala_problem_scenario_2-patient_data.md new file mode 100644 index 0000000..719a2d1 --- /dev/null +++ b/problem_scenarios/scala_problem_scenario_2-patient_data.md @@ -0,0 +1,18 @@ +#Problem Scenario 3 +You have been given the following file containing patient data: +[link to /insight_data/patients.csv] + +Accomplish the following activities: + +1. Find all the patients whose lastVisitDate between current time and '2012-09-15' + +2. Find all the patients who bom in 1990 + +3. Find all the patients age + +4. List patients uhose last visited more than 60 days ago + +5. Select patients 18 years old or younger + + + diff --git a/problem_scenarios/scala_problem_scenario_rdd_join.md b/problem_scenarios/scala_problem_scenario_rdd_join.md new file mode 100644 index 0000000..0e89871 --- /dev/null +++ b/problem_scenarios/scala_problem_scenario_rdd_join.md @@ -0,0 +1,25 @@ +#Problem Scenario: RDD join + +You have been given below code snippet + +``` +val a = "salmon", "salmon", "rat", "elephant"}, 3} + +val b = a.keyBy(_.1ength) + +val c = sc.parallelize(List("dog", "cat", "gnu", "salmon", )), "salmon" , 3) + +val d = c.keyBy(_.1ength) + +``` + +Write a correct code snippet for operationl which will produce desired output, shoun below. + +``` +Array[(lnt, (String, String))] = + +(6 , (salmon,rabbit)) + +(3,(dog,dog)), (3 ,(dog,cat)), (3 ,(dog,gnu)), +``` + diff --git a/project/assembly.sbt b/project/assembly.sbt new file mode 100644 index 0000000..7d2ba3f --- /dev/null +++ b/project/assembly.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0") +resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) \ No newline at end of file diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..64317fd --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15 diff --git a/scripts/create_products_parquet_table_in_hive.sh b/scripts/create_products_parquet_table_in_hive.sh new file mode 100644 index 0000000..b164280 --- /dev/null +++ b/scripts/create_products_parquet_table_in_hive.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +hive -e +" +CREATE EXTERNAL TABLE products_orc (productid int, code string, name string, quantity int, price float) +STORED AS parquet +LOCATION /user/hive/warehouse/product_parquet_table +;" + +#"select * from product_parquet_table;" \ No newline at end of file diff --git a/scripts/create_products_table_in_hive.sh b/scripts/create_products_table_in_hive.sh new file mode 100644 index 0000000..b28e0e0 --- /dev/null +++ b/scripts/create_products_table_in_hive.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash + +hive -e +" +CREATE EXTERNAL TABLE products_orc (productid int, code string, name string, quantity int, price float) +STORED AS orc +LOCATION /user/hive/warehouse/product_orc_table +;" + +#select * from product_orc_table \ No newline at end of file diff --git a/spark-warehouse/product_orc_table/part-00000-6c329148-858e-4742-81f4-b40760a34acf.snappy.orc b/spark-warehouse/product_orc_table/part-00000-6c329148-858e-4742-81f4-b40760a34acf.snappy.orc new file mode 100644 index 0000000000000000000000000000000000000000..66e265b5a1ca15798ad10adb5f69a30636f2b598 GIT binary patch literal 904 zcmah{KWGzC82`Szd&#@k#9R)+moua>O9O2&SgUeK5-(aKl=O`FZ&5;eLulH@(xP?> z5$)h$HWAXN8C&ZJb37hSkX)1+XuNxOqI zj|koM=HgDZRGu;0ZMjm7=N_OVyl>0LcQV8h zHvX<`W+PBt~kh!>xKr}Mk}=|7q1L^H-R#bzv%-?cMe-n7y_gf)x`ANLs`2N>rl zqm&gHqpZqN#z70pB$Fd_2Mv^x@w_7abW#jO`lzZ@btdU{mPt_A9Rm$bMwm=7IczE% zC=^q}6cc#NrL$#Rn)Mbc<cyGNsaghsos#*IT7?fx3Gs|Ef8 X$+CLgNGK3Uu3!oqzf^Uqn2Y=ayhpmU literal 0 HcmV?d00001 diff --git a/spark-warehouse/product_parquet_table/part-00000-fa21c8aa-ec5b-42b9-be4b-fa3930679e14.snappy.parquet b/spark-warehouse/product_parquet_table/part-00000-fa21c8aa-ec5b-42b9-be4b-fa3930679e14.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b1d6f1bd65fc72fbd4c8d99cb6da03eabb83777f GIT binary patch literal 1308 zcmbVM&1=(O7=MzbTe`I}uy05uZY8+tpoVqr+*Zo!x+xuqlpS^A(&o(!OFx#T${0He z!;Ag{4$+-Fdl())h)2Je=fN(1;z30`h|imJTiHa^z>~Zmzu)tF-sks{;^b_S0GT86 z2@=6UClM7{c-;jc3vW9BB)~^|#N#`{v4wXS?=e1Le8l*K@dDv5Fq@nnN*D=k>0)v{&B3VE3g zV|iOIFM7nA+_cZc0)TDJQbP>)^ycj*4U&+ExNtbaxWjoZ3{W^8hJtnsAjM{Be2tCd zgFcs`&g8wm%(aONy(d{;5Yy_*ysx-+6kxI+;5kY&$k~1faM>>$z%zsz7;Jnv-M5@_ zUKZXw_&njadB{(%a*RUws9gv(>bkRd zwNbuCD>~^0!VmCruW)>WAVs1|bS^F`s5H_EVzezW;iZVQ&Pn&VxR^b3U*Zorg@lkKAgNv5qpfk$9S$K` zW_p|R@yEQ+Vp3jx9(;$FesZ{%^$h>L_wh|(zskMS>=wDs;XYP*z`giZKp|r5X#Sbo z7+V2k956_F+9gFGs>p*iXF=2JI<{T&9MBp|wuV+LHL0sPwHsQ6y1Jpey1c`$sP1x| zsu{K6I_Qd;P)&>4Mnlb9UQwA1xV7gP?zYfY)ujv6!G&tmw)K*YI7~nj>TYf|v$DFH zIGBUEyq^U%t?I(xC-LgptKnbSyZ2~c|36NFcW<9>+BKcE?2kRTvI_D{tNW)bPV72$ z8dj|uOKM|UGIpxev}_}m)=PTY(CKKYG-RZyQ7W6G { + println(this.getClass.getName() + ". Error during transformation. Root cause: " + ex.getMessage()) + } + } + println("End " + this.getClass.getName() + " : " + new Date()) + } +} diff --git a/src/main/scala/com/jwk/development/big_data_insights/scala/products/problem_scenario_One.scala b/src/main/scala/com/jwk/development/big_data_insights/scala/products/problem_scenario_One.scala new file mode 100644 index 0000000..afec91e --- /dev/null +++ b/src/main/scala/com/jwk/development/big_data_insights/scala/products/problem_scenario_One.scala @@ -0,0 +1,160 @@ +package com.jwk.development.big_data_insights.scala.products + +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +class problem_scenario_One { + val spark: SparkSession = SparkSession.builder.appName("products_application").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate() + val sparkContext = spark.sparkContext + val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext) + + + def problem_scenario1(path: String): Unit = { + //val tab_delimited_Header= "productID\tproductCode\tname\tquantity\tprice\tsupplierid" + //val comma_delimited_Header= "productID,productCode,name,quantity,price,supplierid" + + //define schema of csv file + val schema = + StructType( + Array( + StructField("productID", IntegerType, false), + StructField("productCode", StringType, false), + StructField("name", StringType, false), + StructField("quantity", IntegerType, false), + StructField("price", FloatType, false) + ) + ) + + //read csv file from directory path using schema + val productDF = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", "\t").option("header", "true").option("inferSchema", "false").schema(schema).load(path) + //show first 10 records in the dataframe + productDF.show(10) + + // Register the DataFrame as a global temporary view + val tempTableName = "products" + productDF.createGlobalTempView(tempTableName) + val globalTempViewName = s"global_temp.$tempTableName" + + //import apache spark sql + import org.apache.spark.sql._ + + //The following answers PART ONE questions of the problem scenario. + //1. Select all the records with quantity >= 5000 and name starts with 'Pen' + println("SELECTING: all the records with quantity >= 5000 and name starts with 'Pen'") + val results1 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE quantity >= 5000 AND name LIKE 'Pen%'") + println("SHOWING: all the records with quantity >= 5000 and name starts with 'Pen'") + results1.show() + + //2. Select all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen' + println("SELECTING: all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen'") + val results2 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE quantity >= 5000 AND price < 1.24 AND name LIKE 'Pen%'") + println("SHOWING: all the records with quantity >= 5000, price is less than 1.24 and name starts with 'Pen'") + results2.show() + + //3. Select all the records witch does not have quantity 5000 and name does not starts with 'Pen' + println("SELECTING: all the records witch does not have quantity 5000 and name does not starts with 'Pen'") + val results3 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE NOT (quantity >= 5000 AND name LIKE 'Pen%')") + println("SHOWING: all the records witch does not have quantity 5000 and name does not starts with 'Pen'") + results3.show() + + //4. Select all the products which name is 'Pen Red', 'Pen Black' + println("SELECTING: all the products which name is 'Pen Red', 'Pen Black'") + val results4 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE name IN ('Pen Red', 'Pen Black')") + println("SHOWING: all the products which name is 'Pen Red', 'Pen Black'") + results4.show() + + //5. Select all the products which has price BETWEEN 1.0 AND 2.0 AND quantity BETWEEN 1000 AND 2000 + println("SELECTING : all the products which has price BETWEEN 1.0 AND 2.0 AND quantity BETWEEN 1000 AND 2000") + val results5 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE (price BETWEEN 1 AND 2) AND (quantity BETWEEN 1000 AND 2000)") + println("SHOWING: all the products which has price BETWEEN 1.0 AND 2.0 AND quantity BETWEEN 1000 AND 2000") + results5.show() + + //Select all the products which has product code as null + println("SELECTING : all the products which has product code as null") + val results6 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE productCode IS NULL") + println("SHOWING: all the products which has product code as null") + results6.show() + + //Select all the products, whose name starts with Pen and results should be order by Price descending order. + println("SELECTING : all the products, whose name stalls with Pen and results should be order by Price descending order.") + val results7 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE name LIKE 'Pen%' ORDER BY price DESC") + println("SHOWING: all the products, whose name startss with Pen and results should be order by Price descending order.") + results7.show() + + //Select all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order. + println("SELECTING : all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order") + val results8 = spark.sql(s"SELECT * FROM $globalTempViewName WHERE name LIKE 'Pen%' ORDER BY price DESC, quantity ASC") + println("SHOWING: all the products, whose name staffs with Pen and results should be order by Price descending order and quantity ascending order") + results8.show() + + //Select top 2 products by price + println("SELECTING : top 2 products by price") + val results9 = spark.sql(s"SELECT * FROM $globalTempViewName ORDER BY price DESC LIMIT 2") + println("SHOWING: top 2 products by price") + results9.show() + + //Select all the columns from product table with output header as below.: `productID AS ID code AS Code name AS Description price AS 'Unit Price' + println("SELECTING : all the columns from product table with output header as below.: `productID AS ID code AS Code name AS Description price AS 'Unit Price'") + val results10 = spark.sql(s"SELECT productID AS ID, productCode AS Code, name AS Description, price AS Unit_Price FROM $globalTempViewName") + println("SHOWING: all the columns from product table with output header as below.: `productID AS ID code AS Code name AS Description price AS 'Unit Pric'") + results10.show() + + //Select code and name both separated by - and header name should be ProductDescription' + println("SELECTING : code and name both separated by ' and header name should be ProductDescription'") + val results11 = spark.sql(s"SELECT CONCAT(productID,'-',name) AS ProductDescription FROM $globalTempViewName") + println("SHOWING: code and name both separated by ' and header name should be ProductDescription'") + results11.show() + + //Select all distinct prices + println("SELECTING : all distinct prices") + val results12 = spark.sql(s"SELECT DISTINCT price AS Distinct_Price FROM $globalTempViewName") + println("SHOWING: all distinct prices") + results12.show() + + //Select distinct price and name combination + println("SELECTING : distinct price and name combination") + val results13 = spark.sql(s"SELECT DISTINCT price, name FROM $globalTempViewName") + println("SHOWING: distinct price and name combination") + results13.show() + + //Select all price data sorted by both code and productID combination + println("SELECTING : all price data sorted by both code and productID combinatiom") + val results15 = spark.sql(s"SELECT * FROM $globalTempViewName ORDER BY productID, productID") + println("SHOWING: all price data sorted by both code and productID combinatiom") + results15.show() + + + //Count number ofproducts for each code + println("SELECTING : Count number of products for each code") + val results16 = spark.sql(s"SELECT * FROM $globalTempViewName ORDER BY price DESC LIMIT 2") + println("SHOWING: Count number ofproducts for each code") + results16.show() + + //save daraframe to hive table in orc format + writeDataFrameToHiveTable(productDF, SaveMode.Overwrite, "orc", "product_orc_table") + + //save daraframe to hive table in orc format + writeDataFrameToHiveTable(productDF, SaveMode.Overwrite, "parquet", "product_parquet_table") + + } + + def writeDataFrameToHiveTable(inputDF: DataFrame, saveMode: SaveMode, dataFormat: String, hiveTableName: String) = { + println(s"Starting to write dataframe to hive table with the following data format $dataFormat and hive table name: $hiveTableName") + //match cases: json, parquet, jdbc, orc, libsvm, csv, text + dataFormat match { + case "json" => inputDF.write.mode(saveMode).format("json").saveAsTable(hiveTableName) + case "parquet" => inputDF.write.mode(saveMode).format("parquet").saveAsTable(hiveTableName) + case "jdbc" => inputDF.write.mode(saveMode).format("jdbc").saveAsTable(hiveTableName) + case "orc" => inputDF.write.mode(saveMode).format("orc").saveAsTable(hiveTableName) + case "csv" => inputDF.write.mode(saveMode).format("libsvm").saveAsTable(hiveTableName) + case "text" => inputDF.write.mode(saveMode).format("text").saveAsTable(hiveTableName) + case "libsvm" => inputDF.write.mode(saveMode).format("libsvm").saveAsTable(hiveTableName) + case _ => "Invalid dataFormat. Allowed formats are: json, parquet, jdbc, orc, csv, text or libsvm" // the default, catch-all + } + + println(s"End write dataframe to hive table with the following table name $dataFormat and hive table name: $hiveTableName") + + } + + +}