-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflink-kafka-workshop.zpln
367 lines (367 loc) · 19.1 KB
/
flink-kafka-workshop.zpln
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
{
"paragraphs": [
{
"text": "%md\n# Enriquecimiento de datos en tiempo real\n\nEn este workshop veremos como enriquecer datos de un stream en tiempo real, haciendo uso de Kafka como fuente de datos y una API externa que nos provee la información adicional que necesitamos.",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:52:53+0000",
"progress": 0,
"config": {
"tableHide": false,
"editorSetting": {
"language": "markdown",
"editOnDblClick": true,
"completionKey": "TAB",
"completionSupport": false
},
"colWidth": 6,
"editorMode": "ace/mode/markdown",
"fontSize": 9,
"editorHide": true,
"results": {},
"enabled": false
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "HTML",
"data": "<div class=\"markdown-body\">\n<h1>Enriquecimiento de datos en tiempo real</h1>\n<p>En este workshop veremos como enriquecer datos de un stream en tiempo real, haciendo uso de Kafka como fuente de datos y una API externa que nos provee la información adicional que necesitamos.</p>\n\n</div>"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078294_1578410993",
"id": "paragraph_1618494605137_215480015",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "FINISHED",
"focus": true,
"$$hashKey": "object:1245",
"dateFinished": "2021-04-21T12:52:47+0000",
"dateStarted": "2021-04-21T12:52:47+0000"
},
{
"text": "%md\n# Agenda\n\n- Probar un flujo simple en flink, desde un arreglo de datos\n- Introducción al caso de uso\n- Extender el flujo con una petición remota\n- Crear un stream de datos usando Kafka\n- Consultar los datos enriquecidos desde el stream de Kafka",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:44:38+0000",
"progress": 0,
"config": {
"tableHide": false,
"editorSetting": {
"language": "markdown",
"editOnDblClick": true,
"completionKey": "TAB",
"completionSupport": false
},
"colWidth": 6,
"editorMode": "ace/mode/markdown",
"fontSize": 9,
"editorHide": true,
"results": {},
"enabled": false
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "HTML",
"data": "<div class=\"markdown-body\">\n<h1>Agenda</h1>\n<ul>\n<li>Probar un flujo simple en flink, desde un arreglo de datos</li>\n<li>Introducción al caso de uso</li>\n<li>Extender el flujo con una petición remota</li>\n<li>Crear un stream de datos usando Kafka</li>\n<li>Consultar los datos enriquecidos desde el stream de Kafka</li>\n</ul>\n\n</div>"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078295_382200621",
"id": "paragraph_1618579646757_1149671721",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1246"
},
{
"text": "%md\n## Flujo de datos en flink\n\nCreemos un stream finito para crear un flujo simple en flink.\n\nEl interprete de Flink para Zepellin tiene 6 formas de uso, destinadas a diferentes propósitos.\n\n%flink - Crea un ambiente de ejecución en Scala (ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment)\n%flink.pyflink - Crea un ambiente de ejecución en Python\n%flink.ipyflink - Crea un ambiente de ejecución en iPython\n%flink.ssql - Crea un ambiente de ejecución de stream sql\n%flink.bsql - Crea un ambiente de ejecución de batch sql\n\nVamos a usar 2 variables que crea Zepellin para el entorno de Flink en Scala (%flink):\n\nsenv (StreamExecutionEnvironment),\nbenv (ExecutionEnvironment)",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:44:38+0000",
"progress": 0,
"config": {
"tableHide": false,
"editorSetting": {
"language": "markdown",
"editOnDblClick": true,
"completionKey": "TAB",
"completionSupport": false
},
"colWidth": 12,
"editorMode": "ace/mode/markdown",
"fontSize": 9,
"editorHide": true,
"results": {},
"enabled": false
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "HTML",
"data": "<div class=\"markdown-body\">\n<h2>Flujo de datos en flink</h2>\n<p>Creemos un stream finito para crear un flujo simple en flink.</p>\n<p>El interprete de Flink para Zepellin tiene 6 formas de uso, destinadas a diferentes propósitos.</p>\n<p>%flink - Crea un ambiente de ejecución en Scala (ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment)<br />\n%flink.pyflink - Crea un ambiente de ejecución en Python<br />\n%flink.ipyflink - Crea un ambiente de ejecución en iPython<br />\n%flink.ssql - Crea un ambiente de ejecución de stream sql<br />\n%flink.bsql - Crea un ambiente de ejecución de batch sql</p>\n<p>Vamos a usar 2 variables que crea Zepellin para el entorno de Flink en Scala (%flink):</p>\n<p>senv (StreamExecutionEnvironment),<br />\nbenv (ExecutionEnvironment)</p>\n\n</div>"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078296_512829127",
"id": "paragraph_1618580440345_1717905200",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1247"
},
{
"text": "%flink\n\nval data = benv.fromElements(\"lider express\", \"walmart tech\", \"lider super\", \"walmart rules\", \"apache flink\", \"apache beam\")\ndata.flatMap(line => line.split(\"\\\\s\"))\n .map(w => (w, 1))\n .groupBy(0)\n .sum(1)\n .print()",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:44:38+0000",
"progress": 0,
"config": {
"lineNumbers": true,
"editorSetting": {
"language": "scala",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"colWidth": 12,
"editorMode": "ace/mode/scala",
"fontSize": 9,
"results": {},
"enabled": true
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078296_1626794984",
"id": "paragraph_1618580883213_880277500",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1248"
},
{
"text": "%md\n\n### Analizemos la salida del primer caso\n\n[Documentación de operadores de Flink](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/)\n\n- En al línea 3 hemos creado un conjunto de datos a analizar, es un conjunto finito que tiende a comportarse como una dupla \"campo1 campo2\"\n- A este conjunto le definimos un flujo:\n - En la línea 4 definimos que para cada línea del conjunto separamos las palabras y creamos un nuevo conjunto conformado sólo por palabras (eliminamos los espacios en blanco y pasamos de un conjunto de 6 elementos a uno de 12).\n - En la lína 5 definimos que para cada palara se devuelva una dupla (palabra, conteo)\n - En la línea 6 agrupamos por el índice 0 de la dupla (palabra)\n - En la línea 7 hacemos una reducción de dimensiones, sumando por palabra el conteo\n - Por último lazamos el resultado a pantalla",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:53:00+0000",
"progress": 0,
"config": {
"tableHide": false,
"editorSetting": {
"language": "markdown",
"editOnDblClick": true,
"completionKey": "TAB",
"completionSupport": false
},
"colWidth": 12,
"editorMode": "ace/mode/markdown",
"fontSize": 9,
"editorHide": true,
"results": {},
"enabled": false
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "HTML",
"data": "<div class=\"markdown-body\">\n<h3>Analizemos la salida del primer caso</h3>\n<p><a href=\"https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/\">Documentación de operadores de Flink</a></p>\n<ul>\n<li>En al línea 3 hemos creado un conjunto de datos a analizar, es un conjunto finito que tiende a comportarse como una dupla “campo1 campo2”</li>\n<li>A este conjunto le definimos un flujo:\n<ul>\n<li>En la línea 4 definimos que para cada línea del conjunto separamos las palabras y creamos un nuevo conjunto conformado sólo por palabras (eliminamos los espacios en blanco y pasamos de un conjunto de 6 elementos a uno de 12).</li>\n<li>En la lína 5 definimos que para cada palara se devuelva una dupla (palabra, conteo)</li>\n<li>En la línea 6 agrupamos por el índice 0 de la dupla (palabra)</li>\n<li>En la línea 7 hacemos una reducción de dimensiones, sumando por palabra el conteo</li>\n<li>Por último lazamos el resultado a pantalla</li>\n</ul>\n</li>\n</ul>\n\n</div>"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078296_651449615",
"id": "paragraph_1618495125350_1979215751",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1249"
},
{
"text": "%md\n---",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:44:38+0000",
"progress": 0,
"config": {
"tableHide": false,
"editorSetting": {
"language": "markdown",
"editOnDblClick": true,
"completionKey": "TAB",
"completionSupport": false
},
"colWidth": 12,
"editorMode": "ace/mode/markdown",
"fontSize": 9,
"editorHide": true,
"results": {},
"enabled": false
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "HTML",
"data": "<div class=\"markdown-body\">\n<hr />\n\n</div>"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078296_462292156",
"id": "paragraph_1618590660943_1344566511",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1250"
},
{
"text": "%md\n### Ejercicio\n\nEn este workshop vamos a trabajar en un concepto muy utilizado en la ingeniería de datos, que es el enriquecimiento de datos.\n\nVamos a tener una fuente de datos que simula el envío de transacciones que se generan al escanear un artículo en cada una da las cajas de una cadena de supermercados. Este mensaje tiene estos campos:\n```\n\"scanItem\": {\n \"tienda\": 1,\n \"codigo_de_barra\": \"ABC-584-000\",\n \"cantidad\": 2,\n \"precio\": 2178\n}\n```\nDada la gran cantidad de transacciones que están pasando, es recomendable que este mensaje sea lo más liviano posible, por lo que si queremos sacar información más entendible debemos crear un job que se ocupe de enriquecer este set de datos.\n\nPara esto vamos a utilizar un servicio que a partir de un código de barras nos entrega información del producto.\n\nEste servicio se encuentra en \"https://kafka-flink-workshop.herokuapp.com/api/v1/products/<codigo_de_barras>\" y al ser llamado con el codigo de barras nos entrega el siguiente mensaje:\n```\n\"producto\": {\n \"codigo_de_barra\": \"ABC-540-000\",\n \"nombre\": \"Juice - Apple Cider\",\n \"departamento\": \"Outdoors\"\n}\n```",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:53:09+0000",
"progress": 0,
"config": {
"tableHide": false,
"editorSetting": {
"language": "markdown",
"editOnDblClick": true,
"completionKey": "TAB",
"completionSupport": false
},
"colWidth": 12,
"editorMode": "ace/mode/markdown",
"fontSize": 9,
"editorHide": true,
"results": {},
"enabled": false
},
"settings": {
"params": {},
"forms": {}
},
"results": {
"code": "SUCCESS",
"msg": [
{
"type": "HTML",
"data": "<div class=\"markdown-body\">\n<h3>Ejercicio</h3>\n<p>En este workshop vamos a trabajar en un concepto muy utilizado en la ingeniería de datos, que es el enriquecimiento de datos.</p>\n<p>Vamos a tener una fuente de datos que simula el envío de transacciones que se generan al escanear un artículo en cada una da las cajas de una cadena de supermercados. Este mensaje tiene estos campos:</p>\n<pre><code>"scanItem": {\n "tienda": 1,\n "codigo_de_barra": "ABC-584-000",\n "cantidad": 2,\n "precio": 2178\n}\n</code></pre>\n<p>Dada la gran cantidad de transacciones que están pasando, es recomendable que este mensaje sea lo más liviano posible, por lo que si queremos sacar información más entendible debemos crear un job que se ocupe de enriquecer este set de datos.</p>\n<p>Para esto vamos a utilizar un servicio que a partir de un código de barras nos entrega información del producto.</p>\n<p>Este servicio se encuentra en “<a href=\"https://kafka-flink-workshop.herokuapp.com/api/v1/products/\">https://kafka-flink-workshop.herokuapp.com/api/v1/products/</a><codigo_de_barras>” y al ser llamado con el codigo de barras nos entrega el siguiente mensaje:</p>\n<pre><code>"producto": {\n "codigo_de_barra": "ABC-540-000",\n "nombre": "Juice - Apple Cider",\n "departamento": "Outdoors"\n}\n</code></pre>\n\n</div>"
}
]
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078297_149833867",
"id": "paragraph_1618580437320_1516215801",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1251"
},
{
"text": "%flink\n\nimport scala.util.parsing.json._\nimport com.google.gson.Gson\nimport scala.io.Source\nimport scala.io.Source.fromURL\nimport java.util.Properties\nimport org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}\nimport org.apache.flink.streaming.util.serialization.SimpleStringSchema\nimport org.apache.flink.api.common.functions.MapFunction\n// documentacion apache kafka connector https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html\n// documentación librería gson https://github.com/google/gson\n\nval KAFKA_SERVER = \"kafka:9092\"\nval KAFKA_CONSUMER_GROUP = \"workshopconsumer\"\nval SOURCE_TOPIC = \"scanned-item\"\nval SINK_TOPIC = \"enriched-item\"\n\n// TODO: definir estrutura de datos para el item scaneado\n// - store: Double\n// - barcode: String\n// - amount: Double\n// - price: Double\n@SerialVersionUID(100L)\nclass ScannedItem(...) extends Serializable {}\n\n// TODO: definir estructura de datos para el item enriquecido\n// - name: String\n// - department: String\n// - barcode: String\n// - amount: Double\n// - store: Double\n@SerialVersionUID(100L)\nclass EnrichedItem(...) extends Serializable {}\n\n// TODO: definir la configurción del productor o sink\nval producerProperties = new Properties\nproducerProperties.setProperty(\"\", KAFKA_SERVER)\n\nval producer = new FlinkKafkaProducer[String](\n SINK_TOPIC,\n new SimpleStringSchema(),\n producerProperties\n)\n\n// TODO: definir la configuracion del consumidor o source\nval consumerProperties = new Properties()\nconsumerProperties.setProperty(\"\", KAFKA_SERVER)\nconsumerProperties.setProperty(\"\", KAFKA_CONSUMER_GROUP)\n\nval stream = senv.addSource(new FlinkKafkaConsumer[String](\n SOURCE_TOPIC,\n new SimpleStringSchema(),\n consumerProperties)\n )\n .map((scannedItemJSON) => {\n // TODO: parsear el json del scannedItem\n // - Convertir el item json en la estructura ScannedItem\n // ...\n scannedItem\n })\n .map((item) => {\n // TODO: enriquecer el item escaneado\n // - Pedir la información adicional del item a su API\n // - Convertir el ScannedItem en EnrichItem\n // ...\n enrichedItem\n })\n .map((item) => {\n val gson = new Gson()\n gson.toJson(item)\n })\n \n// documentación de la lista de operadores en Flink https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/\n\nstream.addSink(producer)\n\n\nsenv.execute(\"data-enrichment\")",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:51:08+0000",
"progress": 0,
"config": {
"editorSetting": {
"language": "scala",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"colWidth": 12,
"editorMode": "ace/mode/scala",
"fontSize": 9,
"results": {},
"enabled": true
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078297_1671814897",
"id": "paragraph_1618934937455_1329533557",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1252"
},
{
"text": "%flink\n",
"user": "anonymous",
"dateUpdated": "2021-04-21T12:44:38+0000",
"progress": 0,
"config": {
"editorSetting": {
"language": "scala",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
"colWidth": 12,
"editorMode": "ace/mode/scala",
"fontSize": 9,
"results": {},
"enabled": true
},
"settings": {
"params": {},
"forms": {}
},
"apps": [],
"runtimeInfos": {},
"progressUpdateIntervalMs": 500,
"jobName": "paragraph_1619009078297_273626584",
"id": "paragraph_1618945388418_172405627",
"dateCreated": "2021-04-21T12:44:38+0000",
"status": "READY",
"$$hashKey": "object:1253"
}
],
"name": "Flink Kafka Workshop",
"id": "2G4WVECTU",
"defaultInterpreterGroup": "spark",
"version": "0.9.0",
"noteParams": {},
"noteForms": {},
"angularObjects": {},
"config": {
"isZeppelinNotebookCronEnable": false,
"looknfeel": "default",
"personalizedMode": "false"
},
"info": {},
"path": "/Flink Kafka Workshop"
}