This repository has been archived by the owner on Jul 28, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbm25.py
205 lines (177 loc) · 8.32 KB
/
bm25.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from functools import lru_cache
import re
import math
import pandas as pd
from tqdm import tqdm
from nltk import WordNetLemmatizer
from nltk.corpus import stopwords
from collections.abc import Set
STOP_WORDS = set(stopwords.words('english'))
class BM25(object):
"""
Class to implement BM25 information retrieval algorithm.
"""
def __init__(self, corpus: pd.DataFrame, k1: float, b: float, delta: float, rf_docs: int, rf_terms: int) -> None:
"""
BM25 class initializer.
params:
corpus: Pandas DataFrame with a column "text"
k1 (float): parameter controlling the impact of the term frequency
b (float): parameter controlling the impact of the document length
delta (float): normalizing parameter introduced in the BM25+ variant
rf_docs (int): number of documents to use for pseudo-relevance feedback
rf_terms (int): number of terms to use for pseudo-relevance feedback
"""
self._corpus = corpus
self.k1 = k1
self.b = b
self.delta = delta
self.rf_docs = rf_docs
self.rf_terms = rf_terms
tqdm.pandas(desc='{:<25}'.format('Preprocessing corpus'))
self._corpus["parsed_text"] = self._corpus["text"].progress_apply(
self._preprocess_text)
tqdm.pandas(desc='{:<25}'.format('Computing word counts'))
self._corpus["word_count"] = self._corpus["parsed_text"].progress_apply(
lambda text: len(text.split(" ")))
self._avg_word_count = self._corpus["word_count"].mean()
tqdm.pandas(desc='{:<25}'.format('Building vocabulary'))
self._vocabulary = self._build_vocabulary()
self._tf = self._compute_tf()
tqdm.pandas(desc='{:<25}'.format('Computing idf'))
self._idf = self._compute_idf()
self._corpus.drop(columns=["parsed_text", "text"], inplace=True)
def _build_vocabulary(self) -> Set[str]:
"""
Returns a set of all unique words in the corpus, based on the 'parsed_text' column of self.corpus.
returns:
vocabulary (set): set of all unique words in the corpus
"""
vocabulary = set()
self._corpus["parsed_text"].progress_apply(
lambda text: vocabulary.update(text.split(" ")))
vocabulary.discard("")
return vocabulary
def _compute_tf(self) -> pd.DataFrame:
"""
Create a dataframe with the term frequency of each term in the vocabulary.
params:
df: Pandas DataFrame with a column "parsed_text"
vocabulary: set of unique words in the corpus
returns:
tf (pd.DataFrame): DataFrame with a column for each unique word in the corpus, containing the number of times
that word appears in the document.
"""
term_frequency = pd.DataFrame(
index=self._corpus["id"], columns=self._vocabulary)
for _, row in tqdm(self._corpus.iterrows(), desc='{:<25}'.format("Computing term frequency"),
total=len(self._corpus)):
tmp = row["parsed_text"].split(" ")
for term in set(tmp) - {""}:
term_frequency.loc[row["id"], term] = tmp.count(term)
term_frequency.fillna(0, inplace=True)
return term_frequency
def _compute_idf(self) -> pd.DataFrame:
"""
Compute the inverse document frequency of each term in the vocabulary,
using the formula idf = log(N / df) + 1.
returns:
idf (pd.DataFrame): DataFrame with a column for each unique word in the corpus, containing the inverse document
frequency of that word.
"""
vocabulary = list(self._vocabulary)
idf = pd.DataFrame(index=vocabulary)
idf["word"] = vocabulary
idf["idf"] = idf["word"].progress_apply(
lambda word: math.log(len(self._corpus) / (self._tf[word].sum() + 1)))
idf.drop(columns="word", inplace=True)
return idf
@staticmethod
def _preprocess_text(text: str) -> str:
"""
Preprocess each string by:
1. Lowercasing the string
2. Removing punctuation and other non-alphanumeric characters
3. Lemmatizing the string using WordNet's lemmatizer
4. Removing stopwords (assumed to be English)
params:
text (str): string to preprocess
returns:
text (str): preprocessed string
"""
text = text.lower()
text = re.sub(r"[^a-z0-9 ]", "", text)
text = " ".join([WordNetLemmatizer().lemmatize(word)
for word in text.split(" ")])
text = " ".join([word for word in text.split(" ")
if word not in STOP_WORDS])
return text
def _score_document(self, corpus_row: pd.DataFrame, query_terms: Set[str]) -> float:
"""
Compute the BM25 score for a document, given a set of query terms.
score(d,q) = sum_{t \in q} w_t
params:
corpus_row (pd.DataFrame): Pandas DataFrame row containing the document to score
query_terms (Set[str]): set of query terms
returns:
score (float): BM25 score for the document
"""
score = 0
for term in query_terms:
if term in self._vocabulary:
temp = self._score_term(corpus_row, term)
score += temp
return round(score, 4)
def _score_term(self, corpus_row: pd.DataFrame, term: str) -> float:
"""
Given a corpus row and a term, returns the BM25 score for that term.
w_t = ((k1 + 1) * tf_t + delta) * idf_t / ((k1 + tf_t) * (1 - b + b * doc_lenght / avg_doc_lenght))
"""
tf = self._tf.loc[corpus_row["id"], term]
term_idf = self._idf.loc[term, "idf"]
return ((self.k1 + 1) * tf + self.delta) * term_idf / (
(self.k1 + tf) * (1 - self.b + self.b * corpus_row["word_count"] / self._avg_word_count))
def _compute_offer_weights(self, results: pd.DataFrame) -> pd.DataFrame:
"""
Compute the offer weights from the document in the results DataFrame to power the pseudo-relevance feedback.
OW_t = #documents containing term t * idf_t
params:
results (pd.DataFrame): DataFrame with a column "id" and a column "score" with the initial results.
returns:
offer_weights (pd.Series): Series with the offer weights for each term.
"""
offer_weights = self._tf.loc[results['id'], :].astype(bool).sum(axis=0)
for idx, value in offer_weights.iteritems():
offer_weights[idx] = value * self._idf.loc[idx, "idf"]
offer_weights = offer_weights.sort_values(ascending=False)
return offer_weights
@lru_cache
def query(self, query: str, max_results: int, expand=False) -> pd.DataFrame:
"""
Given a query string, return a dataframe with the top max_results results.
The results are ranked by the BM25 score. If expand is True, the results are expanded to include the
terms in the original results with the highest Offer Weight, computed with _compute_offer_weights.
params:
query (str): query string
max_results (int): maximum number of results to return
expand (bool): if True, return the expanded results, otherwise return the original results
returns:
results (pd.DataFrame): dataframe with the top max_results results
"""
query = self._preprocess_text(query)
query_terms = set(query.split(" "))
results = self._corpus.copy().drop(columns="word_count")
results["score"] = self._corpus.apply(
lambda row: self._score_document(row, query_terms), axis=1)
results = results[results["score"] > 0]
results = results.sort_values(by="score", ascending=False)
results = results[:min(max_results, len(results))]
if expand:
offer_weights = self._compute_offer_weights(results)
additional_terms = set(
offer_weights.index[:self.rf_terms].to_list())
query_terms = query_terms.union(additional_terms)
new_query = " ".join(query_terms)
print(f"Expanded query: {new_query}")
return self.query(new_query, max_results, expand=False)
return results