Skip to content

Commit

Permalink
Merge pull request #27 from sharath-sg2706/spark-3.3
Browse files Browse the repository at this point in the history
changed the connectionUri to Uri-3.3
  • Loading branch information
badrinathpatchikolla authored Nov 17, 2022
2 parents b0a14df + feb6edb commit 60f6047
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 8 deletions.
49 changes: 44 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,30 @@
[![Build Status](https://github.com/music-of-the-ainur/mongodb.almaren/actions/workflows/mongodb-almaren-githubactions.yml/badge.svg)](https://github.com/music-of-the-ainur/mongodb.almaren/actions/workflows/mongodb-almaren-githubactions.yml)

To add Mongodb Almaren dependency to your sbt build:

```
libraryDependencies += "com.github.music-of-the-ainur" %% "mongodb-almaren" % "0.0.7-3.3"
libraryDependencies += "org.mongodb.spark" % "mongo-spark-connector" % "10.0.4"
```

To run in spark-shell:

```
spark-shell --master local[*] --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.8-3.3,com.github.music-of-the-ainur:mongodb-almaren_2.12:0.0.7-3.3,org.mongodb.spark:mongo-spark-connector:10.0.4"
```


MongoDB Connector is available in
[Maven Central](https://mvnrepository.com/artifact/com.github.music-of-the-ainur)
MongoDB Connector is available in [Maven Central](https://mvnrepository.com/artifact/com.github.music-of-the-ainur)
repository.

| versions | Connector Artifact |
| version | Connector Artifact |
|----------------------------|----------------------------------------------------------------|
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:mongodb-almaren_2.12:0.0.7-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:mongodb-almaren_2.12:0.0.7-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:mongodb-almaren_2.12:0.0.7-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:mongodb-almaren_2.12:0.0.7-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:mongodb-almaren_2.11:0.0.7-2.4` |


## Source and Target

Connector was implemented using: [https://github.com/mongodb/mongo-spark](https://github.com/mongodb/mongo-spark).
Expand Down Expand Up @@ -63,7 +64,25 @@ import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.mongodb.MongoDb.MongoImplicit

almaren.builder.sourceMongoDb("localhost","foo","bar",None,None,Some("srv"))
almaren.builder.sourceMongoDbUri("localhost","foo","bar",None,None,Some("srv"))
```

Parameters for Uri:

| Parameters | Description |
|------------|-------------------------------------------------------------------------------------------------------------------|
| uri | mongodb://localhost:27017/foo |
| collection | bar |
| options | extra connector options |

#### For Connection Uri Type Mongo

```scala
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.mongodb.MongoDb.MongoImplicit

almaren.builder.sourceMongoDb("mongodb://localhost:27017/foo","bar")
```

### Target
Expand Down Expand Up @@ -103,3 +122,23 @@ import com.github.music.of.the.ainur.almaren.mongodb.MongoDb.MongoImplicit

almaren.builder.targetMongoDb("localhost","foo","bar",None,None,Some("srv"), saveMode = SaveMode.Overwrite)
```

Parameters for Uri:

| Parameters | Description |
|------------|-------------------------------------------------------------------------------------------------------------------|
| uri | mongodb://localhost:27017/foo |
| collection | bar |
| options | extra connector options |
| saveMode | SaveMode.Overwrite |


#### For Connection Uri Type Mongo

```scala
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.mongodb.MongoDb.MongoImplicit

almaren.builder.targetMongoDbUri("mongodb://localhost:27017/foo","bar", saveMode = SaveMode.Overwrite)
```
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,21 @@ private[almaren] case class SourceMongoDb(
.load()
}
}
private[almaren] case class SourceMongoDbx(
uri: String,
collection: String,
options: Map[String, String]) extends Source {

def source(df: DataFrame): DataFrame = {
logger.info(s" collection:{$collection}, options:{$options}")

val params = Map("connection.uri" -> uri, "collection" -> "collection") ++ options

df.sparkSession.read.format("mongodb")
.options(params)
.load()
}
}

private[almaren] case class TargetMongoDb(
hosts: String,
Expand Down Expand Up @@ -60,14 +74,39 @@ private[almaren] case class TargetMongoDb(
df
}

}
private[almaren] case class TargetMongoDbx(
uri: String,
collection: String,
options:Map[String,String],
saveMode:SaveMode) extends Target {

def target(df: DataFrame): DataFrame = {
logger.info(s"collection:{$collection}, options:{$options}, saveMode:{$saveMode}")

val params = Map("connection.uri" -> uri, "collection" -> "collection") ++ options

df.write.format("mongodb")
.options(params)
.mode(saveMode)
.save
df
}

}

private[almaren] trait MongoDbConnector extends Core {
def targetMongoDb(hosts: String,database: String,collection: String,user:Option[String] = None,password:Option[String] = None,stringPrefix:Option[String] = None,options:Map[String,String] = Map(),saveMode:SaveMode = SaveMode.ErrorIfExists): Option[Tree] =
TargetMongoDb(hosts,database,collection,user,password,stringPrefix,options,saveMode)

def targetMongoDbUri(uri: String, collection: String, options: Map[String, String] = Map(), saveMode: SaveMode = SaveMode.ErrorIfExists): Option[Tree] =
TargetMongoDbx(uri, collection, options, saveMode)

def sourceMongoDb(hosts: String,database: String,collection: String,user:Option[String] = None,password:Option[String] = None,stringPrefix:Option[String] = None,options:Map[String,String] = Map()): Option[Tree] =
SourceMongoDb(hosts,database,collection,user,password,stringPrefix,options)

def sourceMongoDbUri(uri: String, collection: String, options: Map[String, String] = Map()): Option[Tree] =
SourceMongoDbx(uri, collection, options)
}

object MongoDb {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.functions._
import org.scalatest.funsuite.AnyFunSuite
class Test extends AnyFunSuite with BeforeAndAfter {

class Test extends AnyFunSuite with BeforeAndAfter {

val almaren = Almaren("App Test")

Expand All @@ -18,7 +19,7 @@ class Test extends AnyFunSuite with BeforeAndAfter {
.config("spark.ui.enabled","false")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._
Expand All @@ -34,6 +35,14 @@ class Test extends AnyFunSuite with BeforeAndAfter {

test(df1,df2,"MongoDB")

//Write Data From MongoDB Using Uri
val df3 = almaren.builder.sourceSql(s"SELECT * FROM $testTable").targetMongoDbUri("mongodb://localhost/test", "movie1", saveMode = SaveMode.Overwrite).batch

// Read Data From MongoDB
val df4 = almaren.builder.sourceMongoDbUri("mongodb://localhost/test", "movie1").batch

test(df1,df2,"MongoDB Connection Uri")

def test(df1: DataFrame, df2: DataFrame, name: String): Unit = {
testCount(df1, df2, name)
testCompare(df1, df2, name)
Expand Down Expand Up @@ -80,4 +89,4 @@ class Test extends AnyFunSuite with BeforeAndAfter {
}


}
}

0 comments on commit 60f6047

Please sign in to comment.