Skip to content

Commit

Permalink
inserindo kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
GCarin1 committed Apr 7, 2021
1 parent 440a8d1 commit 4737c24
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 13 deletions.
38 changes: 38 additions & 0 deletions .docker/kafka/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version: "3"

services:

zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
extra_hosts:
- "host.docker.internal:172.17.0.1"

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://host.docker.internal:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
extra_hosts:
- "host.docker.internal:172.17.0.1"

kafka-topics-generator:
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
command: >
bash -c
"sleep 5s &&
kafka-topics --create --topic=route.new-direction --if-not-exists --bootstrap-server=kafka:9092 &&
kafka-topics --create --topic=route.new-position --if-not-exists --bootstrap-server=kafka:9092"
4 changes: 4 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
KafkaReadTopic=route.new-direction
KafkaProduceTopic=route.new-position
KafkaBootstrapServers=host.docker.internal:9094
KafkaConsumerGroupId=simulator
39 changes: 39 additions & 0 deletions Infra/kafka/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kafka

import (
"fmt"
"log"
"os"

ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
)

type KafkaConsumer struct {
MsgChan chan *ckafka.Message
}

func NewKafkaConsumer(msgChan chan *ckafka.Message) *KafkaConsumer {
return &KafkaConsumer{
MsgChan: msgChan,
}
}

func (k *KafkaConsumer) Consume() {
configMap := &ckafka.ConfigMap{
"bootstrap.servers": os.Getenv("KafkaBootstrapServers"),
"group.id": os.Getenv("KafkaConsumerGroupId"),
}
c, err := ckafka.NewConsumer(configMap)
if err != nil {
log.Fatalf("error consuming kafka message:" + err.Error())
}
topics := []string{os.Getenv("KafkaReadTopic")}
c.SubscribeTopics(topics, nil)
fmt.Println("Kafka consumer has been started")
for {
msg, err := c.ReadMessage(-1)
if err == nil {
k.MsgChan <- msg
}
}
}
31 changes: 31 additions & 0 deletions Infra/kafka/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package kafka

import (
"log"
"os"

ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
)

func NewKafkaProducer() *ckafka.Producer {
configMap := &ckafka.ConfigMap{
"bootstrap.servers": os.Getenv("KafkaBootstrapServers"),
}
p, err := ckafka.NewProducer(configMap)
if err != nil {
log.Println(err.Error())
}
return p
}
func Publish(msg string, topic string, producer *ckafka.Producer) error {
message := &ckafka.Message{
TopicPartition: ckafka.TopicPartition{Topic: &topic, Partition: ckafka.PartitionAny},
Value: []byte(msg),
}
err := producer.Produce(message, nil)
if err != nil {
return err

}
return nil
}
27 changes: 27 additions & 0 deletions application/kafka/produce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kafka

import (
"encoding/json"
"log"
"os"
"time"

"github.com/GCarin1/CodeDelivery.git/Infra/kafka"
route2 "github.com/GCarin1/CodeDelivery.git/application/route"
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
)

func Produce(msg *ckafka.Message) {
producer := kafka.NewKafkaProducer()
route := route2.NewRoute()
json.Unmarshal(msg.Value, &route)
route.LoadPositions()
positions, err := route.ExportJsonPositions()
if err != nil {
log.Println(err.Error())
}
for _, p := range positions {
kafka.Publish(p, os.Getenv("KafkaProduceTopic"), producer)
time.Sleep(time.Millisecond * 500)
}
}
31 changes: 18 additions & 13 deletions application/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,32 @@ import (
)

type Route struct {
ID string `json:"routeId"`
ClientID string `json:"clientID"`
Positions []Positions `json:"position"`
ID string `json:"routeId"`
ClientID string `json:"clientId"`
Positions []Position `json:"position"`
}
type Positions struct {

type Position struct {
Lat float64 `json:"lat"`
Long float64 `json:"long"`
}

type PartialRoutePosition struct {
ID string `json:"routeId"`
ClientID string `json:"clientId"`
Positions []float64 `json:"position"`
Finished bool `json:"finished"`
ID string `json:"routeId"`
ClientID string `json:"clientId"`
Position []float64 `json:"position"`
Finished bool `json:"finished"`
}

func (r *Route) loadPositions() error {
func NewRoute() *Route {
return &Route{}
}

func (r *Route) LoadPositions() error {
if r.ID == "" {
return errors.New("route id not informed")
}
f, err := os.Open("destinations" + r.ID + ".txt")
f, err := os.Open("destinations/" + r.ID + ".txt")
if err != nil {
return err
}
Expand All @@ -45,7 +51,7 @@ func (r *Route) loadPositions() error {
if err != nil {
return nil
}
r.Positions = append(r.Positions, Positions{
r.Positions = append(r.Positions, Position{
Lat: lat,
Long: long,
})
Expand All @@ -57,11 +63,10 @@ func (r *Route) ExportJsonPositions() ([]string, error) {
var route PartialRoutePosition
var result []string
total := len(r.Positions)

for k, v := range r.Positions {
route.ID = r.ID
route.ClientID = r.ClientID
route.Positions = []float64{v.Lat, v.Long}
route.Position = []float64{v.Lat, v.Long}
route.Finished = false
if total-1 == k {
route.Finished = true
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
module github.com/GCarin1/CodeDelivery.git

go 1.16

require (
github.com/confluentinc/confluent-kafka-go v1.6.1
github.com/joho/godotenv v1.3.0
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/confluentinc/confluent-kafka-go v1.6.1 h1:YxM/UtMQ2vgJX2gIgeJFUD0ANQYTEvfo4Cs4qKUlmGE=
github.com/confluentinc/confluent-kafka-go v1.6.1/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
29 changes: 29 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"fmt"
"log"

"github.com/GCarin1/CodeDelivery.git/Infra/kafka"
kafka2 "github.com/GCarin1/CodeDelivery.git/application/kafka"
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/joho/godotenv"
)

func init() {
err := godotenv.Load()
if err != nil {
log.Fatal("error loading .env file")
}
}

func main() {
msgChan := make(chan *ckafka.Message)
consumer := kafka.NewKafkaConsumer(msgChan)
go consumer.Consume()

for msg := range msgChan {
fmt.Println(string(msg.Value))
go kafka2.Produce(msg)
}
}

0 comments on commit 4737c24

Please sign in to comment.