diff --git a/examples/SQL+DF-Examples/retail-analytics/notebooks/python/Retail Analytics.ipynb b/examples/SQL+DF-Examples/retail-analytics/notebooks/python/retail-analytic.ipynb similarity index 81% rename from examples/SQL+DF-Examples/retail-analytics/notebooks/python/Retail Analytics.ipynb rename to examples/SQL+DF-Examples/retail-analytics/notebooks/python/retail-analytic.ipynb index 4142b96a0..3ad5717ff 100644 --- a/examples/SQL+DF-Examples/retail-analytics/notebooks/python/Retail Analytics.ipynb +++ b/examples/SQL+DF-Examples/retail-analytics/notebooks/python/retail-analytic.ipynb @@ -10,43 +10,38 @@ "from pyspark.sql import SparkSession\n", "from pyspark import broadcast, SparkConf\n", "import time\n", + "import os\n", "\n", + "RAPIDS_JAR = os.getenv(\"RAPIDS_JAR\", \"/path/to/your/jars/rapids.jar\")\n", + "SPARK_MASTER = os.getenv(\"SPARK_MASTER_URL\", \"spark://ip:port\")\n", + "print(\"RAPIDS_JAR: {}\".format(RAPIDS_JAR))\n", "if \"sc\" in globals():\n", " sc.stop()\n", "\n", "### Configure the parameters based on your dataproc cluster ###\n", "conf = SparkConf().setAppName(\"Retail Analytics\")\n", - "conf.set(\"spark.plugins\", \"com.nvidia.spark.SQLPlugin\")\n", - "conf.set(\"spark.executor.instances\", \"8\")\n", + "conf.setMaster(SPARK_MASTER)\n", + "conf.set(\"spark.driver.extraClassPath\", RAPIDS_JAR)\n", + "conf.set(\"spark.executor.extraClassPath\", RAPIDS_JAR)\n", + "conf.set(\"spark.jars\", RAPIDS_JAR)\n", + "conf.set(\"spark.executor.instances\", \"1\")\n", "conf.set(\"spark.executor.cores\", \"4\")\n", "conf.set(\"spark.task.resource.gpu.amount\", \"0.25\")\n", "conf.set(\"spark.rapids.sql.concurrentGpuTasks\", \"2\")\n", - "conf.set(\"spark.executor.memory\", \"8192m\")\n", - "conf.set(\"spark.sql.files.maxPartitionBytes\", \"512m\")\n", + "conf.set(\"spark.executor.memory\", \"4g\")\n", + "conf.set(\"spark.sql.files.maxPartitionBytes\", \"128m\")\n", "conf.set(\"spark.executor.resource.gpu.amount\", \"1\")\n", - "conf.set(\"spark.rapids.memory.pinnedPool.size\", \"4096m\")\n", - "conf.set(\"spark.executor.memoryOverhead\", \"4915m\")\n", - "conf.set(\"spark.sql.broadcastTimeout\", \"700\")\n", - "conf.set(\"spark.sql.shuffle.partitions\", \"500\")\n", - "conf.set(\"spark.driver.maxResultSize\", \"8g\")\n", - "conf.set(\"spark.driver.memory\", \"10g\")\n", + "conf.set(\"spark.rapids.memory.pinnedPool.size\", \"2048m\")\n", + "conf.set(\"spark.executor.memoryOverhead\", \"4096m\")\n", "conf.set(\"spark.dynamicAllocation.enabled\", \"false\")\n", - "conf.set(\"spark.sql.adaptive.enabled\", \"true\")\n", - "conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"300M\")\n", - "conf.set(\"spark.rapids.memory.host.spillStorageSize\", \"4g\")\n", - "conf.set(\"spark.rapids.sql.multiThreadedRead.numThreads\", \"40\")\n", - "conf.set(\"spark.rapids.sql.castDecimalToString.enabled\",True)\n", + "conf.set(\"spark.rapids.sql.format.json.read.enabled\",True)\n", "conf.set(\"spark.rapids.sql.castStringToTimestamp.enabled\",True)\n", "conf.set(\"spark.rapids.sql.expression.PercentRank\",False)\n", "conf.set(\"spark.rapids.sql.castDecimalToString.enabled\",True)\n", - "conf.set(\"spark.scheduler.minRegisteredResourcesRatio\", \"0.0\")\n", - "conf.set(\"spark.sql.adaptive.advisoryPartitionSizeInBytes\", \"128M\")\n", - "conf.set(\"spark.sql.adaptive.coalescePartitions.minPartitionNum\", \"1\")\n", - "conf.set(\"spark.yarn.executor.launch.excludeOnFailure.enabled\",True)\n", - "conf.set(\"spark.rapids.sql.format.json.read.enabled\",True)\n", - "conf.set(\"spark.rapids.sql.explain\",None)\n", + "conf.set(\"spark.rapids.sql.hasExtendedYearValues\",False)\n", "conf.set(\"spark.rapids.sql.enabled\",True)\n", - " \n", + "conf.set(\"spark.plugins\", \"com.nvidia.spark.SQLPlugin\")\n", + "conf.set(\"spark.rapids.sql.allowMultipleJars\", \"ALWAYS\")\n", "\n", "spark = SparkSession.builder \\\n", " .config(conf=conf) \\\n", @@ -63,7 +58,7 @@ "source": [ "import os\n", "# You need to update these to your real paths!\n", - "dataRoot = os.getenv(\"DATA_ROOT\", 'gs:///data')" + "dataRoot = os.getenv(\"DATA_ROOT\", 'path/to/your/datasets')" ] }, { @@ -93,22 +88,22 @@ " return spark.read.format(format).load(file_path)\n", "\n", "# read sales data\n", - "sales_df = read_data(spark, \"csv\", dataRoot+\"/raw/sales/\")\n", + "sales_df = read_data(spark, \"csv\", dataRoot+\"/sales/\")\n", "\n", "# read stock data\n", - "stock_df = read_data(spark, \"json\", dataRoot+\"/raw/stock/\")\n", + "stock_df = read_data(spark, \"json\", dataRoot+\"/stock/\")\n", "\n", "# read supplier data\n", - "supplier_df = read_data(spark, \"json\", dataRoot+\"/raw/supplier/\")\n", + "supplier_df = read_data(spark, \"json\", dataRoot+\"/supplier/\")\n", "\n", "# read customer data\n", - "customer_df = read_data(spark, \"csv\", dataRoot+\"/raw/customer/\")\n", + "customer_df = read_data(spark, \"csv\", dataRoot+\"/customer/\")\n", "\n", "# read market data\n", - "market_df = read_data(spark, \"csv\", dataRoot+\"/raw/market/\")\n", + "market_df = read_data(spark, \"csv\", dataRoot+\"/market/\")\n", "\n", "# read logistic data\n", - "logistic_df = read_data(spark, \"csv\", dataRoot+\"/raw/logistic/\")\n", + "logistic_df = read_data(spark, \"csv\", dataRoot+\"/logistic/\")\n", "\n", "\n", "# data cleaning\n", @@ -158,7 +153,8 @@ "data_int = sales_df.join(stock_df, \"product_name\",\"leftouter\").join(supplier_df, \"product_name\",\"leftouter\").join(market_df, \"product_name\",\"leftouter\").join(logistic_df, \"product_name\",\"leftouter\").join(customer_df, \"customer_id\",\"leftouter\") \n", "\n", "# write the cleaned data\n", - "data_int.write.format(\"parquet\").save(dataRoot+\"/cleaned/\")\n", + "os.makedirs(dataRoot+\"cleaned/\", exist_ok=True)\n", + "data_int.write.mode(\"overwrite\").format(\"parquet\").save(dataRoot+\"/cleaned/\")\n", "\n", "end = time.time()\n", "\n", @@ -233,8 +229,8 @@ "aggregated_df = grouped_df.agg(sum(\"quantity_in_stock\").alias(\"total_quantity_in_stock\"),avg(\"price\").alias(\"average_price\"),sum(\"quantity_ordered\").alias(\"total_quantity_ordered\"),sum(\"quantity_sold\").alias(\"total_quantity_sold\"),sum(col(\"price\") * col(\"quantity_sold\")).alias(\"total_sales\"),sum(\"prev_sales\").alias(\"total_prev_sales\"),sum(\"next_sales\").alias(\"total_next_sales\"),).sort(desc(\"total_sales\"))\n", "\n", "#WRITE THE AGGREGATES TO DISK\n", - "aggregated_df.write.format(\"parquet\").save(dataRoot+\"/app/data.parquet\")\n", - "total_sales_by_product_location.write.format(\"parquet\").save(dataRoot+\"/app1/data.parquet\")\n", + "aggregated_df.write.mode(\"overwrite\").format(\"parquet\").save(dataRoot+\"/app/data.parquet\")\n", + "total_sales_by_product_location.write.mode(\"overwrite\").format(\"parquet\").save(dataRoot+\"/app1/data.parquet\")\n", "\n", "end = time.time()\n", "\n", @@ -243,19 +239,12 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "spark.stop()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { @@ -274,7 +263,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.5" + "version": "3.9.19" } }, "nbformat": 4, diff --git a/examples/SQL+DF-Examples/retail-analytics/notebooks/python/Data Generator.ipynb b/examples/SQL+DF-Examples/retail-analytics/notebooks/python/retail-datagen.ipynb similarity index 75% rename from examples/SQL+DF-Examples/retail-analytics/notebooks/python/Data Generator.ipynb rename to examples/SQL+DF-Examples/retail-analytics/notebooks/python/retail-datagen.ipynb index dc2b2b4bb..9d4e30b39 100644 --- a/examples/SQL+DF-Examples/retail-analytics/notebooks/python/Data Generator.ipynb +++ b/examples/SQL+DF-Examples/retail-analytics/notebooks/python/retail-datagen.ipynb @@ -19,12 +19,12 @@ "import random\n", "\n", "# You need to update these to your real paths!\n", - "dataRoot = os.getenv(\"DATA_ROOT\", 'gs://bucket-name/data/raw/')" + "dataRoot = os.getenv(\"DATA_ROOT\", '/path/to/your/datasets')" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -32,19 +32,20 @@ "def generate_data(i):\n", " sales_id = \"s_{}\".format(i)\n", " product_name = \"Product_{}\".format(i)\n", - " price = random.uniform(1,100)\n", - " quantity_sold = random.randint(1,100)\n", + " price = random.uniform(1,10)\n", + " quantity_sold = random.randint(1,10)\n", " date_of_sale = \"2022-{}-{}\".format(random.randint(1,12), random.randint(1,28))\n", - " customer_id = \"c_{}\".format(random.randint(1,1000000))\n", + " customer_id = \"c_{}\".format(random.randint(1,10000))\n", " return (sales_id, product_name, price, quantity_sold, date_of_sale, customer_id)\n", "\n", "with mp.Pool(mp.cpu_count()) as p:\n", - " sales_data = p.map(generate_data, range(100000000))\n", + " sales_data = p.map(generate_data, range(1000000))\n", " sales_data = list(sales_data)\n", " \n", "print(\"write to gcs started\")\n", "sales_df = pd.DataFrame(sales_data, columns=[\"sales_id\", \"product_name\", \"price\", \"quantity_sold\", \"date_of_sale\", \"customer_id\"])\n", - "sales_df.to_csv(dataRoot+\"sales/data.csv\", index=False, header=True)\n", + "os.makedirs(dataRoot+\"/sales/\", exist_ok=True)\n", + "sales_df.to_csv(dataRoot+\"/sales/data.csv\", index=False, header=True)\n", "print(\"Write to gcs completed\")" ] }, @@ -58,17 +59,18 @@ " product_name = \"Product_{}\".format(i)\n", " shelf_life = random.randint(1,365)\n", " contains_promotion = \"{} % off\".format(random.randint(0,10))\n", - " quantity_in_stock = random.randint(1,1000)\n", + " quantity_in_stock = random.randint(1,100)\n", " location = \"Location_{}\".format(random.randint(1,100))\n", " date_received = \"2022-{}-{}\".format(random.randint(1,12), random.randint(1,28))\n", " return (product_name,shelf_life,contains_promotion,quantity_in_stock, location, date_received)\n", "\n", "with mp.Pool(mp.cpu_count()) as p:\n", - " stock_data = p.map(generate_data, range(5000000))\n", + " stock_data = p.map(generate_data, range(50000))\n", " stock_data = list(stock_data)\n", " \n", "stock_df = pd.DataFrame(stock_data, columns=[\"product_name\",\"shelf_life\",\"contains_promotion\",\"quantity_in_stock\", \"location\", \"date_received\"])\n", - "stock_df.to_json(dataRoot+\"stock/stock.json\", orient='records')\n", + "os.makedirs(dataRoot+\"/stock/\", exist_ok=True)\n", + "stock_df.to_json(dataRoot+\"/stock/stock.json\", orient='records')\n", "print(\"Write to gcs completed\")" ] }, @@ -81,17 +83,18 @@ "def generate_data(i):\n", " sup_id = \"s_{}\".format(i)\n", " product_name = \"Product_{}\".format(i)\n", - " quantity_ordered = random.randint(1,1000)\n", - " price = random.uniform(1,100)\n", + " quantity_ordered = random.randint(1,100)\n", + " price = random.uniform(1,10)\n", " date_ordered = \"2022-{}-{}\".format(random.randint(1,12), random.randint(1,28))\n", " return (sup_id,product_name, quantity_ordered, price, date_ordered)\n", "\n", "with mp.Pool(mp.cpu_count()) as p:\n", - " supplier_data = p.map(generate_data, range(5000000))\n", + " supplier_data = p.map(generate_data, range(50000))\n", " supplier_data = list(supplier_data)\n", " \n", "supplier_df = pd.DataFrame(supplier_data, columns=[\"sup_id\",\"product_name\", \"quantity_ordered\", \"price\", \"date_ordered\"])\n", - "supplier_df.to_json(dataRoot+\"supplier/supplier.json\", orient='records')\n", + "os.makedirs(dataRoot+\"/supplier/\", exist_ok=True)\n", + "supplier_df.to_json(dataRoot+\"/supplier/supplier.json\", orient='records')\n", "print(\"Write to gcs completed\")" ] }, @@ -111,11 +114,12 @@ " return (customer_id,customer_name, age, gender, purchase_history, contact_info)\n", "\n", "with mp.Pool(mp.cpu_count()) as p:\n", - " customer_data = p.map(generate_data, range(100000))\n", + " customer_data = p.map(generate_data, range(1000))\n", " customer_data = list(customer_data)\n", " \n", "customer_df = pd.DataFrame(customer_data, columns=[\"customer_id\",\"customer_name\", \"age\", \"gender\", \"purchase_history\", \"contact_info\"])\n", - "customer_df.to_csv(dataRoot+\"customer/customer.csv\", index=False,header=True)\n", + "os.makedirs(dataRoot+\"/customer/\", exist_ok=True)\n", + "customer_df.to_csv(dataRoot+\"/customer/customer.csv\", index=False,header=True)\n", "print(\"Write to gcs completed\")" ] }, @@ -133,11 +137,12 @@ " return (product_name, competitor_price, sales_trend, demand_forecast)\n", "\n", "with mp.Pool(mp.cpu_count()) as p:\n", - " market_data = p.map(generate_data, range(50000000))\n", + " market_data = p.map(generate_data, range(500000))\n", " market_data = list(market_data)\n", " \n", "market_df = pd.DataFrame(market_data, columns=[\"product_name\", \"competitor_price\", \"sales_trend\", \"demand_forecast\"])\n", - "market_df.to_csv(dataRoot+\"market/market.csv\", index=False,header=True)\n", + "os.makedirs(dataRoot+\"/market/\", exist_ok=True)\n", + "market_df.to_csv(dataRoot+\"/market/market.csv\", index=False,header=True)\n", "print(\"Write to gcs completed\")" ] }, @@ -149,17 +154,18 @@ "source": [ "def generate_data(i):\n", " product_name = \"Product_{}\".format(i)\n", - " shipping_cost = random.uniform(1,100)\n", - " transportation_cost = random.uniform(1,100)\n", - " warehouse_cost = random.uniform(1,100)\n", + " shipping_cost = random.uniform(1,10)\n", + " transportation_cost = random.uniform(1,10)\n", + " warehouse_cost = random.uniform(1,10)\n", " return (product_name, shipping_cost, transportation_cost, warehouse_cost)\n", "\n", "with mp.Pool(mp.cpu_count()) as p:\n", - " logistic_data = p.map(generate_data, range(50000000))\n", + " logistic_data = p.map(generate_data, range(500000))\n", " logistic_data = list(logistic_data)\n", " \n", "logistic_df = pd.DataFrame(logistic_data, columns=[\"product_name\", \"shipping_cost\", \"transportation_cost\", \"warehouse_cost\"])\n", - "logistic_df.to_csv(dataRoot+\"logistic/logistic.csv\", index=False,header=True)\n", + "os.makedirs(dataRoot+\"/logistic/\", exist_ok=True)\n", + "logistic_df.to_csv(dataRoot+\"/logistic/logistic.csv\", index=False,header=True)\n", "print(\"Write to gcs completed\")" ] } @@ -180,7 +186,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.5" + "version": "3.9.19" } }, "nbformat": 4,