forked from toptal/chewy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscrolling.rb
137 lines (126 loc) · 5.85 KB
/
scrolling.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
module Chewy
module Search
# This module contains batch requests DSL via ES scroll API. All the methods
# are optimized on memory consumption, they are not caching anythig, so
# use them when you need to do some single-run stuff on a huge amount of
# documents. Don't forget to tune the `scroll` parameter for long-lasting
# actions.
# All the scroll methods respect the limit value if provided.
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
module Scrolling
# Iterates through the documents of the scope in batches. Limit if overrided
# by the `batch_size`. There are 2 possible use-cases: with a block or without.
#
# @param batch_size [Integer] batch size obviously, replaces `size` query parameter
# @param scroll [String] cursor expiration time
#
# @overload scroll_batches(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_batches { |batch| batch.each { |hit| p hit['_id'] } }
# @yieldparam batch [Array<Hash>] block is executed for each batch of hits
#
# @overload scroll_batches(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_batches.flat_map { |batch| batch.map { |hit| hit['_id'] } }
# @return [Enumerator] a standard ruby Enumerator
def scroll_batches(batch_size: Request::DEFAULT_BATCH_SIZE, scroll: Request::DEFAULT_SCROLL)
return enum_for(:scroll_batches, batch_size: batch_size, scroll: scroll) unless block_given?
result = perform(size: batch_size, scroll: scroll)
total = [raw_limit_value, result.fetch('hits', {}).fetch('total', {}).fetch('value', 0)].compact.min
last_batch_size = total % batch_size
fetched = 0
scroll_id = nil
loop do
hits = result.fetch('hits', {}).fetch('hits', [])
fetched += hits.size
hits = hits.first(last_batch_size) if last_batch_size != 0 && fetched >= total
yield(hits) if hits.present?
scroll_id = result['_scroll_id']
break if fetched >= total
result = perform_scroll(scroll: scroll, scroll_id: scroll_id)
end
ensure
Chewy.client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
end
# @!method scroll_hits(batch_size: 1000, scroll: '1m')
# Iterates through the documents of the scope in batches. Yields each hit separately.
#
# @param batch_size [Integer] batch size obviously, replaces `size` query parameter
# @param scroll [String] cursor expiration time
#
# @overload scroll_hits(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_hits { |hit| p hit['_id'] }
# @yieldparam hit [Hash] block is executed for each hit
#
# @overload scroll_hits(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_hits.map { |hit| hit['_id'] }
# @return [Enumerator] a standard ruby Enumerator
def scroll_hits(**options, &block)
return enum_for(:scroll_hits, **options) unless block_given?
scroll_batches(**options).each do |batch|
batch.each(&block)
end
end
# @!method scroll_wrappers(batch_size: 1000, scroll: '1m')
# Iterates through the documents of the scope in batches. Yields
# each hit wrapped with {Chewy::Index}.
#
# @param batch_size [Integer] batch size obviously, replaces `size` query parameter
# @param scroll [String] cursor expiration time
#
# @overload scroll_wrappers(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_wrappers { |object| p object.id }
# @yieldparam object [Chewy::Index] block is executed for each hit object
#
# @overload scroll_wrappers(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_wrappers.map { |object| object.id }
# @return [Enumerator] a standard ruby Enumerator
def scroll_wrappers(**options)
return enum_for(:scroll_wrappers, **options) unless block_given?
scroll_hits(**options).each do |hit|
yield loader.derive_index(hit['_index']).build(hit)
end
end
# @!method scroll_objects(batch_size: 1000, scroll: '1m')
# Iterates through the documents of the scope in batches. Performs load
# operation for each batch and then yields each loaded ORM/ODM object.
# Uses {Chewy::Search::Request#load} passed options for loading.
#
# @note If the record is not found it yields nil instead.
# @see Chewy::Search::Request#load
# @see Chewy::Search::Loader
# @param batch_size [Integer] batch size obviously, replaces `size` query parameter
# @param scroll [String] cursor expiration time
#
# @overload scroll_objects(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_objects { |record| p record.id }
# @yieldparam record [Object] block is executed for each record loaded
#
# @overload scroll_objects(batch_size: 1000, scroll: '1m')
# @example
# PlaceIndex.scroll_objects.map { |record| record.id }
# @return [Enumerator] a standard ruby Enumerator
def scroll_objects(**options, &block)
return enum_for(:scroll_objects, **options) unless block_given?
except(:source, :stored_fields, :script_fields, :docvalue_fields)
.source(false).scroll_batches(**options).each do |batch|
loader.load(batch).each(&block)
end
end
alias_method :scroll_records, :scroll_objects
alias_method :scroll_documents, :scroll_objects
private
def perform_scroll(body)
ActiveSupport::Notifications.instrument 'search_query.chewy', notification_payload(request: body) do
Chewy.client.scroll(body)
end
end
end
end
end