diff --git a/rtrec/models/bprslim.py b/examples/deprecated/bprslim.py similarity index 100% rename from rtrec/models/bprslim.py rename to examples/deprecated/bprslim.py diff --git a/rtrec/models/fm.py b/examples/deprecated/fm.py similarity index 100% rename from rtrec/models/fm.py rename to examples/deprecated/fm.py diff --git a/rtrec/models/base.py b/rtrec/models/base.py index 78faf39..9e0f002 100644 --- a/rtrec/models/base.py +++ b/rtrec/models/base.py @@ -53,15 +53,52 @@ def _get_rating(self, user_id: int, item_id: int) -> float: """ return self.interactions.get_user_item_rating(user_id, item_id, default_rating=0.0) - def recommend_batch(self, users: List[Any], top_k: int = 10, filter_interacted: bool = True) -> List[List[Any]]: + def fit(self, user_interactions: Iterable[Tuple[Any, Any, float, float]], update_interaction: bool=False) -> None: + for user, item, tstamp, rating in user_interactions: + try: + user_id = self.user_ids.identify(user) + item_id = self.item_ids.identify(item) + self.interactions.add_interaction(user_id, item_id, tstamp, rating, upsert=update_interaction) + self._update(user_id, item_id) + except Exception as e: + logging.warning(f"Error processing interaction: {e}") + continue + + @abstractmethod + def _update(self, user_id: int, item_id: int, rating: float) -> None: + raise NotImplementedError("The _update method must be implemented by the subclass.") + + def predict_rating(self, user: Any, item: Any) -> float: + user_id = self.user_ids.get_id(user) + item_id = self.item_ids.get_id(item) + if user_id is None or item_id is None: + return 0.0 + return self._predict_rating(user_id, item_id, bypass_prediction=True) + + def predict_rating_batch(self, users: List[Any], items: List[Any]) -> List[float]: """ - Recommend top-K items for a list of users. + Predict ratings for a list of user-item pairs. :param users: List of user indices - :param top_k: Number of top items to recommend - :param filter_interacted: Whether to filter out items the user has already interacted with - :return: List of top-K item indices recommended for each user + :param items: List of item indices + :return: List of predicted ratings for each user-item pair """ - return [self.recommend(user, top_k, filter_interacted) for user in users] + user_ids = [self.user_ids.get_id(user) for user in users] + item_ids = [self.item_ids.get_id(item) for item in items] + + return [ + self._predict_rating(user_id, item_id, bypass_prediction=False) if user_id is not None and item_id is not None else 0.0 + for user_id, item_id in zip(user_ids, item_ids) + ] + + @abstractmethod + def _predict_rating(self, user_id: int, item_id: int, bypass_prediction: bool=False) -> float: + """ + Compute the derivative of the loss function. + :param user_id: User index + :param item_id: Item index + :param bypass_prediction: Flag to bypass prediction if user has only interacted with the item (default: False) + """ + raise NotImplementedError("The _predict_rating method must be implemented by the subclass.") def recommend(self, user: Any, top_k: int = 10, filter_interacted: bool = True) -> List[Any]: """ @@ -80,7 +117,10 @@ def recommend(self, user: Any, top_k: int = 10, filter_interacted: bool = True) candidate_item_ids = self.interactions.get_all_non_interacted_items(user_id) if filter_interacted else self.interactions.get_all_non_negative_items(user_id) # Predict scores for candidate items - scores = self._predict(user_id, candidate_item_ids) + scores = [ + self._predict_rating(user_id, item_id, bypass_prediction=False) + for item_id in candidate_item_ids + ] # Map item IDs back to original items candidate_items = [self.item_ids[id] for id in candidate_item_ids] @@ -90,6 +130,16 @@ def recommend(self, user: Any, top_k: int = 10, filter_interacted: bool = True) # return sorted(candidate_items, key=dict(zip(candidate_items, scores)).get, reverse=True)[:top_k] return [k for k, v in sorted(zip(candidate_items, scores), key=lambda x: x[1], reverse=True)[:top_k]] + def recommend_batch(self, users: List[Any], top_k: int = 10, filter_interacted: bool = True) -> List[List[Any]]: + """ + Recommend top-K items for a list of users. + :param users: List of user indices + :param top_k: Number of top items to recommend + :param filter_interacted: Whether to filter out items the user has already interacted with + :return: List of top-K item indices recommended for each user + """ + return [self.recommend(user, top_k, filter_interacted) for user in users] + def similar_items(self, query_items: List[Any], top_k: int = 10, filter_query_items: bool = True) -> List[List[Any]]: """ Find similar items for a list of query items. @@ -145,111 +195,3 @@ def _predict(self, user_id: int, item_ids: List[int]) -> List[float]: Predict scores for a list of items. """ raise NotImplementedError("The _predict method must be implemented by the subclass.") - -class ImplicitFeedbackRecommender(BaseRecommender): - - def __init__(self, **kwargs: Any): - """ - Initialize the implicit feedback recommender model. - :param kwargs: Additional keyword arguments for the model - """ - super().__init__(**kwargs) - - def fit(self, user_interactions: Iterable[Tuple[int, float, Any, Any]], update_interaction: bool=False) -> None: - """ - Incrementally fit the BPRSLIM model with user interactions. - :param user_interactions: List of (user, positive_item, negative_item) tuples - :param epoch: Current training epoch number (default is 0) - """ - for user, tstamp, positive_item, negative_item in user_interactions: - # Update user-item interactions - try: - user_id = self.user_ids.identify(user) - positive_item_id = self.item_ids.identify(positive_item) - negative_item_id = self.item_ids.identify(negative_item) - self.interactions.add_interaction(user_id, positive_item_id, tstamp, delta=1, upsert=update_interaction) - self.interactions.add_interaction(user_id, negative_item_id, tstamp, delta=-1, upsert=update_interaction) - self._update(user_id, positive_item_id, negative_item_id) - except Exception as e: - logging.warning(f"Error processing interaction: {e}") - continue - - - @abstractmethod - def _update(self, user: int, positive_item_id: int, negative_item_id: int) -> None: - """ - Incremental weight update based on BPR loss. - :param user: User index - :param positive_item_id: Item index for the positive sample - :param negative_item_id: Item index for the negative sample - """ - raise NotImplementedError("The _update method must be implemented by the subclass.") - - -class ExplicitFeedbackRecommender(BaseRecommender): - - def __init__(self, **kwargs: Any): - """ - Initialize the implicit feedback recommender model. - :param kwargs: Additional keyword arguments for the model - """ - super().__init__(**kwargs) - - def fit(self, user_interactions: Iterable[Tuple[Any, Any, float, float]], update_interaction: bool=False) -> None: - for user, item, tstamp, rating in user_interactions: - try: - user_id = self.user_ids.identify(user) - item_id = self.item_ids.identify(item) - self.interactions.add_interaction(user_id, item_id, tstamp, rating, upsert=update_interaction) - self._update(user_id, item_id) - except Exception as e: - logging.warning(f"Error processing interaction: {e}") - continue - - @abstractmethod - def _update(self, user_id: int, item_id: int, rating: float) -> None: - raise NotImplementedError("The _update method must be implemented by the subclass.") - - def _predict(self, user_id: int, item_ids: List[int]) -> List[float]: - """ - Predict scores for a list of items. - """ - return [ - self._predict_rating(user_id, item_id, bypass_prediction=False) - for item_id in item_ids - ] - - def predict_rating_batch(self, users: List[Any], items: List[Any]) -> List[float]: - """ - Predict ratings for a list of user-item pairs. - :param users: List of user indices - :param items: List of item indices - :return: List of predicted ratings for each user-item pair - """ - user_ids = [self.user_ids.get_id(user) for user in users] - item_ids = [self.item_ids.get_id(item) for item in items] - - return [ - self._predict_rating(user_id, item_id, bypass_prediction=False) if user_id is not None and item_id is not None else 0.0 - for user_id, item_id in zip(user_ids, item_ids) - ] - - def predict_rating(self, user: Any, item: Any) -> float: - user_id = self.user_ids.get_id(user) - item_id = self.item_ids.get_id(item) - if user_id is None or item_id is None: - return 0.0 - return self._predict_rating(user_id, item_id, bypass_prediction=True) - - @abstractmethod - def _predict_rating(self, user_id: int, item_id: int, bypass_prediction: bool=False) -> float: - """ - Compute the derivative of the loss function. - :param user_id: User index - :param item_id: Item index - :param bypass_prediction: Flag to bypass prediction if user has only interacted with the item (default: False) - """ - raise NotImplementedError("The _predict_rating method must be implemented by the subclass.") - -def inv_scaling(alpha: float, step: int, power_t: float) -> float: - return alpha / pow(step, power_t) diff --git a/rtrec/models/slim.py b/rtrec/models/slim.py index 9bb0911..f26d2c9 100644 --- a/rtrec/models/slim.py +++ b/rtrec/models/slim.py @@ -3,9 +3,9 @@ from typing import Any, List, Optional from math import inf -from .base import ExplicitFeedbackRecommender +from .base import BaseRecommender -class SLIM_MSE(ExplicitFeedbackRecommender): +class SLIM_MSE(BaseRecommender): def __init__(self, **kwargs: Any): super().__init__(**kwargs) diff --git a/rtrec/utils/optim.py b/rtrec/utils/optim.py index ae1d390..09cd32e 100644 --- a/rtrec/utils/optim.py +++ b/rtrec/utils/optim.py @@ -109,6 +109,9 @@ def update_gradients(self, item_idx: int, grad: float) -> float: return weight_update +def inv_scaling(alpha: float, step: int, power_t: float) -> float: + return alpha / pow(step, power_t) + def get_optimizer(name: str, **kwargs: Any) -> Optimizer: """ Create an instance of an Optimizer based on the provided name and keyword arguments.