diff --git a/fuo_ytmusic/provider.py b/fuo_ytmusic/provider.py index a8b709d..c6216dc 100644 --- a/fuo_ytmusic/provider.py +++ b/fuo_ytmusic/provider.py @@ -1,3 +1,5 @@ +import logging +from urllib.parse import urlparse, parse_qs from typing import List, Optional from feeluown.excs import NoUserLoggedIn @@ -14,6 +16,8 @@ from fuo_ytmusic.models import Categories, YtBriefUserModel, YtmusicWatchPlaylistSong from fuo_ytmusic.service import YtmusicService, YtmusicType, YtmusicPrivacyStatus +logger = logging.getLogger(__name__) + class YtmusicProvider(AbstractProvider, ProviderV2): @@ -155,6 +159,29 @@ def song_list_quality(self, song) -> List[Quality.Audio]: return song_.list_formats() if song_ is not None else [] def song_get_media(self, song: SongModel, quality: Quality.Audio) -> Optional[Media]: + media = self._get_media(song, quality) + if media is None: + return media + url = media.url + # 推断(cosven): service.song_info 接口返回的 url 里面会记录请求时的 IP, + # 如果后面真正访问 url 时,如果自己的 IP 已经变了(比如自己的代理 IP 变了), + # 那么会碰到 403 错误。 + # + # 注:你或许会想,把 url 中的 IP 改变当前的 public IP,是不是就行了? + # 这其实也是不行的,因为整个 url 是已经有摘要信息的,它要和摘要匹配。 + if self.service.check_stream_url(url): + return media + parse_result = urlparse(url) + kvs = parse_qs(parse_result.query) + ips = kvs.get('ip', []) + ip = ips[0] if ips else '' + logger.info( + f"url for video({song.identifier}) is invalid now, will retry! " + f"maybe your public IP is changed (expected ip: {ip} ), (url: {url} )" + ) + return self._get_media(song, quality) + + def _get_media(self, song, quality: Quality.Audio): song_info = self.service.song_info(song.identifier) format_code, bitrate, format_str = song_info.get_media(quality) url = self.service.stream_url(song_info, song.identifier, format_code) diff --git a/fuo_ytmusic/service.py b/fuo_ytmusic/service.py index 7574258..b39f47c 100644 --- a/fuo_ytmusic/service.py +++ b/fuo_ytmusic/service.py @@ -344,13 +344,6 @@ def history(self) -> List[YtmusicHistorySong]: response = self.api.get_history() return [YtmusicHistorySong(**data) for data in response] - def stream_url(self, song_info: SongInfo, video_id: str, format_code: int) -> Optional[str]: - formats = song_info.streamingData.adaptiveFormats - for f in formats: - if int(f.itag) == format_code: - return self._get_stream_url(f, video_id) - return None - def create_playlist( self, title: str, @@ -408,9 +401,18 @@ def delete_upload_song(self, entity_id: str) -> str: # STATUS_SUCCEEDED return self.api.delete_upload_entity(entity_id) - def _get_stream_url( - self, f: SongInfo.StreamingData.Format, video_id: str, retry=True - ) -> Optional[str]: + def stream_url(self, song_info: SongInfo, video_id: str, format_code: int) -> Optional[str]: + formats = song_info.streamingData.adaptiveFormats + for f in formats: + if int(f.itag) == format_code: + return self._get_stream_url(f, video_id) + return None + + def check_stream_url(self, url): + resp = self._session.head(url) + return resp.status_code != 403 + + def _get_stream_url(self, f: SongInfo.StreamingData.Format, video_id: str) -> Optional[str]: if f.url is not None and f.url != "": return f.url sig_ch = f.signatureCipher @@ -422,12 +424,6 @@ def _get_stream_url( res[key] = unquote(sig[len(key + "=") :]) signature = self.get_cipher().get_signature(ciphered_signature=res["s"]) _url = res["url"] + "&sig=" + signature - if retry: - r = self._session.head(_url) - if r.status_code == 403: - logger.info(f"{self._log_thread()} url for video({video_id}) is invalid, reset cipher and retry.") - self.reset_cipher() - return self._get_stream_url(f, video_id, retry=False) return _url