generated from estuary/flow-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path2_derive.flow.yaml
125 lines (114 loc) · 4.37 KB
/
2_derive.flow.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import:
- 1_capture.flow.yaml
collections:
# User profiles, mapping from user => {segments: [segment => status]}.
# This derivation is designed for materialization into a persistent key/value
# store, which provides backing storage of reduced segment sets for each user.
# As such, this derivation is stateless and produces partially combined roll-ups
# for each user, which are then fully reduced upon materialization into a store.
examples/segment/profiles:
schema: schemas/derived.schema.yaml#/$defs/profile
key: [/user]
projections:
user_id: /user
derivation:
transform:
fromSegmentation:
# Map segmentation to implied user profile.
source:
name: examples/segment/events
publish:
lambda: typescript
# Membership is a mapping of (segment, user) => {segmentation status}.
# The derivation is suited for materialization into sorted key/value stores
# like ScyllaDB and BigTable, having efficient range-read operations which
# can be used to walk the full membership of a segment (even if very large).
# As with the "pull" user profile, this derivation is stateless and produces
# partial roll-ups of the current segmentation status for each composite key,
# which is fully reduced only upon materialization into a store.
examples/segment/memberships:
schema: schemas/derived.schema.yaml#/$defs/membership
key: [/segment/vendor, /segment/name, /user]
projections:
vendor: /segment/vendor
segment_name: /segment/name
user_id: /user
derivation:
transform:
fromSegmentation:
# Map segmentation to implied membership segmentation status.
source:
name: examples/segment/events
publish:
lambda: typescript
# Toggles are annotated events which change the status of a user, e.x. from
# "added" to "removed". They do not include events which initially add a user
# to a segment, but do include subsequent events which remove the user or
# re-add them. Each event is annotated with a /previous event which is the
# last event of the user's former status: /previous is a "remove" if
# the present event is an "add", or vice versa.
examples/segment/toggles:
schema:
$ref: schemas/event.schema.yaml
properties:
previous: { $ref: schemas/event.schema.yaml }
required: [previous]
key: [/event]
projections:
user_id: /user
derivation:
# Registers track the last event for each (segment, user), along with
# a bit indicating whether the user has ever been added to the segment.
register:
initial: {}
schema:
type: object
properties:
event: { $ref: schemas/event.schema.yaml }
firstAdd: { const: true }
reduce: { strategy: merge }
transform:
fromSegmentation:
source:
name: examples/segment/events
shuffle:
key: [/segment/vendor, /segment/name, /user]
update:
lambda: typescript
publish:
lambda: typescript
#
# Something to try: the profiles collection can be altered to a "push"
# model by reducing user segments within derivation registers, and then
# publishing each current, fully-reduced segment set. This works well with
# stateless materializations, like Webhooks or pub/sub streams, where the
# full set is required with each POST.
#
#examples/segment/profiles:
# schema:
# $ref: derived.schema.yaml#/$defs/profile
# # Published values are already fully reduced. Just take the last.
# reduce: { strategy: lastWriteWins }
# key: [/user]
# projections:
# user_id: /user
#
# derivation:
# register:
# # Source documents are shuffled to a register on /user.
# # Within each register, we accumulate the user's segments.
# schema: derived.schema.yaml#SegmentSet
# initial: []
# transform:
# fromSegmentation:
# # Update maps the segmentation to its implied segment set, which
# # is reduced into the register. Then publish the reduced register
# # mapped into a profile.
# source:
# name: examples/segment/events
# shuffle:
# key: [/user]
# update:
# lambda: typescript
# publish:
# lambda: typescript