Skip to content

Commit

Permalink
Changes for deprecated models
Browse files Browse the repository at this point in the history
  • Loading branch information
myui committed Dec 9, 2024
1 parent 25e0c74 commit 9c55ac5
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 117 deletions.
File renamed without changes.
File renamed without changes.
172 changes: 57 additions & 115 deletions rtrec/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,52 @@ def _get_rating(self, user_id: int, item_id: int) -> float:
"""
return self.interactions.get_user_item_rating(user_id, item_id, default_rating=0.0)

def recommend_batch(self, users: List[Any], top_k: int = 10, filter_interacted: bool = True) -> List[List[Any]]:
def fit(self, user_interactions: Iterable[Tuple[Any, Any, float, float]], update_interaction: bool=False) -> None:
for user, item, tstamp, rating in user_interactions:
try:
user_id = self.user_ids.identify(user)
item_id = self.item_ids.identify(item)
self.interactions.add_interaction(user_id, item_id, tstamp, rating, upsert=update_interaction)
self._update(user_id, item_id)
except Exception as e:
logging.warning(f"Error processing interaction: {e}")
continue

@abstractmethod
def _update(self, user_id: int, item_id: int, rating: float) -> None:
raise NotImplementedError("The _update method must be implemented by the subclass.")

def predict_rating(self, user: Any, item: Any) -> float:
user_id = self.user_ids.get_id(user)
item_id = self.item_ids.get_id(item)
if user_id is None or item_id is None:
return 0.0
return self._predict_rating(user_id, item_id, bypass_prediction=True)

def predict_rating_batch(self, users: List[Any], items: List[Any]) -> List[float]:
"""
Recommend top-K items for a list of users.
Predict ratings for a list of user-item pairs.
:param users: List of user indices
:param top_k: Number of top items to recommend
:param filter_interacted: Whether to filter out items the user has already interacted with
:return: List of top-K item indices recommended for each user
:param items: List of item indices
:return: List of predicted ratings for each user-item pair
"""
return [self.recommend(user, top_k, filter_interacted) for user in users]
user_ids = [self.user_ids.get_id(user) for user in users]
item_ids = [self.item_ids.get_id(item) for item in items]

return [
self._predict_rating(user_id, item_id, bypass_prediction=False) if user_id is not None and item_id is not None else 0.0
for user_id, item_id in zip(user_ids, item_ids)
]

@abstractmethod
def _predict_rating(self, user_id: int, item_id: int, bypass_prediction: bool=False) -> float:
"""
Compute the derivative of the loss function.
:param user_id: User index
:param item_id: Item index
:param bypass_prediction: Flag to bypass prediction if user has only interacted with the item (default: False)
"""
raise NotImplementedError("The _predict_rating method must be implemented by the subclass.")

def recommend(self, user: Any, top_k: int = 10, filter_interacted: bool = True) -> List[Any]:
"""
Expand All @@ -80,7 +117,10 @@ def recommend(self, user: Any, top_k: int = 10, filter_interacted: bool = True)
candidate_item_ids = self.interactions.get_all_non_interacted_items(user_id) if filter_interacted else self.interactions.get_all_non_negative_items(user_id)

# Predict scores for candidate items
scores = self._predict(user_id, candidate_item_ids)
scores = [
self._predict_rating(user_id, item_id, bypass_prediction=False)
for item_id in candidate_item_ids
]

# Map item IDs back to original items
candidate_items = [self.item_ids[id] for id in candidate_item_ids]
Expand All @@ -90,6 +130,16 @@ def recommend(self, user: Any, top_k: int = 10, filter_interacted: bool = True)
# return sorted(candidate_items, key=dict(zip(candidate_items, scores)).get, reverse=True)[:top_k]
return [k for k, v in sorted(zip(candidate_items, scores), key=lambda x: x[1], reverse=True)[:top_k]]

def recommend_batch(self, users: List[Any], top_k: int = 10, filter_interacted: bool = True) -> List[List[Any]]:
"""
Recommend top-K items for a list of users.
:param users: List of user indices
:param top_k: Number of top items to recommend
:param filter_interacted: Whether to filter out items the user has already interacted with
:return: List of top-K item indices recommended for each user
"""
return [self.recommend(user, top_k, filter_interacted) for user in users]

def similar_items(self, query_items: List[Any], top_k: int = 10, filter_query_items: bool = True) -> List[List[Any]]:
"""
Find similar items for a list of query items.
Expand Down Expand Up @@ -145,111 +195,3 @@ def _predict(self, user_id: int, item_ids: List[int]) -> List[float]:
Predict scores for a list of items.
"""
raise NotImplementedError("The _predict method must be implemented by the subclass.")

class ImplicitFeedbackRecommender(BaseRecommender):

def __init__(self, **kwargs: Any):
"""
Initialize the implicit feedback recommender model.
:param kwargs: Additional keyword arguments for the model
"""
super().__init__(**kwargs)

def fit(self, user_interactions: Iterable[Tuple[int, float, Any, Any]], update_interaction: bool=False) -> None:
"""
Incrementally fit the BPRSLIM model with user interactions.
:param user_interactions: List of (user, positive_item, negative_item) tuples
:param epoch: Current training epoch number (default is 0)
"""
for user, tstamp, positive_item, negative_item in user_interactions:
# Update user-item interactions
try:
user_id = self.user_ids.identify(user)
positive_item_id = self.item_ids.identify(positive_item)
negative_item_id = self.item_ids.identify(negative_item)
self.interactions.add_interaction(user_id, positive_item_id, tstamp, delta=1, upsert=update_interaction)
self.interactions.add_interaction(user_id, negative_item_id, tstamp, delta=-1, upsert=update_interaction)
self._update(user_id, positive_item_id, negative_item_id)
except Exception as e:
logging.warning(f"Error processing interaction: {e}")
continue


@abstractmethod
def _update(self, user: int, positive_item_id: int, negative_item_id: int) -> None:
"""
Incremental weight update based on BPR loss.
:param user: User index
:param positive_item_id: Item index for the positive sample
:param negative_item_id: Item index for the negative sample
"""
raise NotImplementedError("The _update method must be implemented by the subclass.")


class ExplicitFeedbackRecommender(BaseRecommender):

def __init__(self, **kwargs: Any):
"""
Initialize the implicit feedback recommender model.
:param kwargs: Additional keyword arguments for the model
"""
super().__init__(**kwargs)

def fit(self, user_interactions: Iterable[Tuple[Any, Any, float, float]], update_interaction: bool=False) -> None:
for user, item, tstamp, rating in user_interactions:
try:
user_id = self.user_ids.identify(user)
item_id = self.item_ids.identify(item)
self.interactions.add_interaction(user_id, item_id, tstamp, rating, upsert=update_interaction)
self._update(user_id, item_id)
except Exception as e:
logging.warning(f"Error processing interaction: {e}")
continue

@abstractmethod
def _update(self, user_id: int, item_id: int, rating: float) -> None:
raise NotImplementedError("The _update method must be implemented by the subclass.")

def _predict(self, user_id: int, item_ids: List[int]) -> List[float]:
"""
Predict scores for a list of items.
"""
return [
self._predict_rating(user_id, item_id, bypass_prediction=False)
for item_id in item_ids
]

def predict_rating_batch(self, users: List[Any], items: List[Any]) -> List[float]:
"""
Predict ratings for a list of user-item pairs.
:param users: List of user indices
:param items: List of item indices
:return: List of predicted ratings for each user-item pair
"""
user_ids = [self.user_ids.get_id(user) for user in users]
item_ids = [self.item_ids.get_id(item) for item in items]

return [
self._predict_rating(user_id, item_id, bypass_prediction=False) if user_id is not None and item_id is not None else 0.0
for user_id, item_id in zip(user_ids, item_ids)
]

def predict_rating(self, user: Any, item: Any) -> float:
user_id = self.user_ids.get_id(user)
item_id = self.item_ids.get_id(item)
if user_id is None or item_id is None:
return 0.0
return self._predict_rating(user_id, item_id, bypass_prediction=True)

@abstractmethod
def _predict_rating(self, user_id: int, item_id: int, bypass_prediction: bool=False) -> float:
"""
Compute the derivative of the loss function.
:param user_id: User index
:param item_id: Item index
:param bypass_prediction: Flag to bypass prediction if user has only interacted with the item (default: False)
"""
raise NotImplementedError("The _predict_rating method must be implemented by the subclass.")

def inv_scaling(alpha: float, step: int, power_t: float) -> float:
return alpha / pow(step, power_t)
4 changes: 2 additions & 2 deletions rtrec/models/slim.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from typing import Any, List, Optional
from math import inf

from .base import ExplicitFeedbackRecommender
from .base import BaseRecommender

class SLIM_MSE(ExplicitFeedbackRecommender):
class SLIM_MSE(BaseRecommender):
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)

Expand Down
3 changes: 3 additions & 0 deletions rtrec/utils/optim.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def update_gradients(self, item_idx: int, grad: float) -> float:

return weight_update

def inv_scaling(alpha: float, step: int, power_t: float) -> float:
return alpha / pow(step, power_t)

def get_optimizer(name: str, **kwargs: Any) -> Optimizer:
"""
Create an instance of an Optimizer based on the provided name and keyword arguments.
Expand Down

0 comments on commit 9c55ac5

Please sign in to comment.