1
- import { elizaLogger } from "@elizaos/core" ;
1
+ import { elizaLogger , stringToUuid , getEmbeddingZeroVector } from "@elizaos/core" ;
2
2
import { ClientBase } from "./base" ;
3
3
import { IAgentRuntime } from "@elizaos/core" ;
4
- import { Tweet } from "./types " ;
5
- import { SearchMode } from "./search " ;
4
+ import { SearchMode , Tweet } from "agent-twitter-client " ;
5
+ import { buildConversationThread } from "./utils " ;
6
6
7
7
export class TwitterSourceClient {
8
8
client : ClientBase ;
9
9
runtime : IAgentRuntime ;
10
10
private isProcessing : boolean = false ;
11
11
private lastCheckedTweetId : string | null = null ;
12
12
private targetAccounts : string [ ] ;
13
-
13
+
14
14
constructor ( client : ClientBase , runtime : IAgentRuntime ) {
15
15
this . client = client ;
16
16
this . runtime = runtime ;
17
- TWITTER_SOURCE_ACCOUNTS:
18
- runtime . getSetting ( "TWITTER_SOURCE_ACCOUNTS" ) ||
17
+ const accountString = runtime . getSetting ( "TWITTER_SOURCE_ACCOUNTS" ) ||
19
18
process . env . TWITTER_SOURCE_ACCOUNTS ||
20
- "" , // Add this line
21
-
19
+ "" ;
20
+
21
+ this . targetAccounts = accountString
22
+ . split ( ',' )
23
+ . map ( account => account . trim ( ) . replace ( '@' , '' ) ) // Remove @ if present
24
+ . filter ( account => account . length > 0 ) ;
22
25
// Log configuration
23
26
elizaLogger . log ( "Twitter Source Client Configuration:" ) ;
24
- elizaLogger . log ( `- Monitoring accounts: ${ this . targetAccounts . join ( ', ' ) } ` ) ;
27
+ console . log ( this . targetAccounts ) ;
28
+ //elizaLogger.log(`- Monitoring accounts: ${this.targetAccounts.join(', ')}`);
25
29
elizaLogger . log ( `- Poll interval: ${ this . client . twitterConfig . TWITTER_POLL_INTERVAL } seconds` ) ;
26
30
}
27
31
@@ -30,12 +34,15 @@ export class TwitterSourceClient {
30
34
this . monitorSourceAccounts ( ) ;
31
35
setTimeout (
32
36
handleSourceMonitorLoop ,
33
- this . client . twitterConfig . TWITTER_POLL_INTERVAL * 1000
37
+ this . client . twitterConfig . TWITTER_POLL_INTERVAL * 10
34
38
) ;
39
+ //const aixbtUserId = stringToUuid('aixbt_agent');
40
+ //const memories = this.runtime.messageManager.getMemoriesByRoomIds([aixbtUserId]);
41
+ // console.log("LATEST aixbt memory:", memories[0]);
35
42
} ;
36
-
37
43
handleSourceMonitorLoop ( ) ;
38
- elizaLogger . log ( "Source monitoring loop started" ) ;
44
+
45
+ // elizaLogger.log("Source monitoring loop started");
39
46
}
40
47
41
48
private async monitorSourceAccounts ( ) {
@@ -48,44 +55,119 @@ export class TwitterSourceClient {
48
55
49
56
try {
50
57
for ( const username of this . targetAccounts ) {
51
- elizaLogger . log ( `Checking tweets from ${ username } ` ) ;
52
-
58
+ elizaLogger . log ( `Checking tweets from source account ${ username } ` ) ;
59
+ const aixbtUserId = stringToUuid ( 'aixbt_agent' ) ;
60
+ const memories = await this . runtime . messageManager . getMemories ( {
61
+ roomId : aixbtUserId ,
62
+ count : 1 ,
63
+ unique : true
64
+ } ) ;
65
+
66
+ if ( memories . length > 0 ) {
67
+ elizaLogger . log ( 'Latest aiXBT memory:' , {
68
+ id : memories [ 0 ] . id ,
69
+ text : memories [ 0 ] . content . text ,
70
+ timestamp : new Date ( memories [ 0 ] . createdAt ) . toISOString ( )
71
+ } ) ;
72
+ } else {
73
+ elizaLogger . log ( 'No memories found for aiXBT' ) ;
74
+ }
53
75
try {
54
- const userTweets = (
55
- await this . client . twitterClient . fetchSearchTweets (
76
+ // Fetch both tweets and replies from the user
77
+ const [ userTweetsResponse , userRepliesResponse ] = await Promise . all ( [
78
+ // Fetch regular tweets
79
+ this . client . twitterClient . fetchSearchTweets (
56
80
`from:${ username } ` ,
57
- 5 , // Fetch last 5 tweets
81
+ 20 ,
58
82
SearchMode . Latest
59
- )
60
- ) . tweets ;
83
+ ) . catch ( error => {
84
+ elizaLogger . error ( `Error fetching tweets for ${ username } :` , error ) ;
85
+ return { tweets : [ ] } ;
86
+ } ) ,
87
+ // Fetch replies
88
+ this . client . twitterClient . fetchSearchTweets (
89
+ `from:${ username } is:reply` ,
90
+ 20 ,
91
+ SearchMode . Latest
92
+ ) . catch ( error => {
93
+ elizaLogger . error ( `Error fetching replies for ${ username } :` , error ) ;
94
+ return { tweets : [ ] } ;
95
+ } )
96
+ ] ) ;
97
+
98
+ // Combine tweets and replies
99
+ const allTweets = [ ...( userTweetsResponse ?. tweets || [ ] ) , ...( userRepliesResponse ?. tweets || [ ] ) ] ;
61
100
62
- // Filter for unprocessed, non-reply tweets
63
- const validTweets = userTweets . filter ( ( tweet ) => {
64
- const isUnprocessed = ! this . lastCheckedTweetId ||
65
- parseInt ( tweet . id ) > parseInt ( this . lastCheckedTweetId ) ;
66
- const isRecent = Date . now ( ) - tweet . timestamp * 1000 <
67
- 2 * 60 * 60 * 1000 ; // Last 2 hours
101
+ // Filter for unprocessed tweets
102
+ const validTweets = allTweets . filter ( ( tweet ) => {
103
+ if ( ! tweet || ! tweet . id || ! tweet . timestamp ) {
104
+ elizaLogger . warn ( "Invalid tweet object found:" , tweet ) ;
105
+ return false ;
106
+ }
68
107
69
- return isUnprocessed && ! tweet . isReply && ! tweet . isRetweet && isRecent ;
108
+ try {
109
+ const isUnprocessed = ! this . lastCheckedTweetId ||
110
+ parseInt ( tweet . id ) > parseInt ( this . lastCheckedTweetId ) ;
111
+ const isRecent = Date . now ( ) - tweet . timestamp * 1000 <
112
+ 2 * 60 * 60 * 1000 ; // Last 2 hours
113
+
114
+ return isUnprocessed && ! tweet . isRetweet && isRecent ;
115
+ } catch ( error ) {
116
+ elizaLogger . error ( `Error filtering tweet ${ tweet . id } :` , error ) ;
117
+ return false ;
118
+ }
70
119
} ) ;
71
120
72
121
if ( validTweets . length > 0 ) {
73
- elizaLogger . log ( `Found ${ validTweets . length } new tweets from ${ username } ` ) ;
122
+ elizaLogger . log ( `Found ${ validTweets . length } new tweets/replies from ${ username } ` ) ;
123
+
124
+ // For replies, fetch the conversation thread
125
+ for ( const tweet of validTweets ) {
126
+ if ( tweet . isReply ) {
127
+ try {
128
+ const thread = await buildConversationThread ( tweet , this . client ) ;
129
+ // Create a clean version of the thread without circular references
130
+ tweet . conversationThread = thread . map ( t => ( {
131
+ id : t . id ,
132
+ text : t . text ,
133
+ username : t . username ,
134
+ timestamp : t . timestamp ,
135
+ isReply : t . isReply ,
136
+ isRetweet : t . isRetweet ,
137
+ inReplyToStatusId : t . inReplyToStatusId ,
138
+ permanentUrl : t . permanentUrl
139
+ } ) ) ;
140
+ } catch ( error ) {
141
+ elizaLogger . error ( `Error fetching conversation thread for tweet ${ tweet . id } :` , error ) ;
142
+ tweet . conversationThread = [ ] ;
143
+ }
144
+ }
145
+ }
146
+
74
147
await this . processTweets ( validTweets ) ;
75
148
76
149
// Update last checked ID
77
- const latestTweetId = Math . max ( ...validTweets . map ( t => parseInt ( t . id ) ) ) . toString ( ) ;
78
- this . lastCheckedTweetId = latestTweetId ;
79
-
80
- // Cache the latest checked ID
81
- await this . client . cacheManager . set (
82
- `twitter/source/${ username } /lastChecked` ,
83
- { id : latestTweetId , timestamp : Date . now ( ) }
84
- ) ;
150
+ const validIds = validTweets
151
+ . filter ( t => t . id )
152
+ . map ( t => parseInt ( t . id ) )
153
+ . filter ( id => ! isNaN ( id ) ) ;
154
+
155
+ if ( validIds . length > 0 ) {
156
+ const latestTweetId = Math . max ( ...validIds ) . toString ( ) ;
157
+ this . lastCheckedTweetId = latestTweetId ;
158
+
159
+ // Cache the latest checked ID
160
+ await this . client . cacheManager . set (
161
+ `twitter/source/${ username } /lastChecked` ,
162
+ { id : latestTweetId , timestamp : Date . now ( ) }
163
+ ) . catch ( error => {
164
+ elizaLogger . error ( `Error caching latest tweet ID for ${ username } :` , error ) ;
165
+ } ) ;
166
+ }
85
167
}
86
168
87
169
} catch ( error ) {
88
- elizaLogger . error ( `Error fetching tweets for ${ username } :` , error ) ;
170
+ elizaLogger . error ( `Error processing tweets for ${ username } :` , error ) ;
89
171
continue ;
90
172
}
91
173
}
@@ -97,25 +179,64 @@ export class TwitterSourceClient {
97
179
private async processTweets ( tweets : Tweet [ ] ) {
98
180
for ( const tweet of tweets ) {
99
181
try {
182
+ if ( ! tweet . id || ! tweet . text || ! tweet . timestamp ) {
183
+ elizaLogger . warn ( "Skipping invalid tweet:" , tweet ) ;
184
+ continue ;
185
+ }
186
+
187
+ // Serialize conversation thread to avoid circular references
188
+ /*const serializedThread = tweet.conversationThread?.map(t => ({
189
+ id: t.id,
190
+ text: t.text,
191
+ username: t.username,
192
+ timestamp: t.timestamp,
193
+ isReply: t.isReply,
194
+ isRetweet: t.isRetweet,
195
+ inReplyToStatusId: t.inReplyToStatusId,
196
+ permanentUrl: t.permanentUrl
197
+ }));*/
198
+
199
+ const roomId = stringToUuid ( tweet . conversationId ) ;
200
+ elizaLogger . log ( "Creating new memory:" , {
201
+ id : tweet . id + "-" + this . runtime . agentId ,
202
+ text : tweet . text ,
203
+ source : "twitter" ,
204
+ url : tweet . permanentUrl ,
205
+ inReplyTo : tweet . inReplyToStatusId ,
206
+ createdAt : new Date ( tweet . timestamp * 1000 ) . toISOString ( ) ,
207
+ roomId : roomId ,
208
+ userId : tweet . userId === this . twitterUserId ? this . runtime . agentId : tweet . userId
209
+ } ) ;
100
210
// Create memory entry for the tweet
101
- await this . runtime . messageManager . createMemory ( {
102
- id : stringToUuid ( tweet . id + "-source-" + this . runtime . agentId ) ,
103
- userId : this . runtime . agentId ,
211
+ this . runtime . messageManager . createMemory ( {
212
+ id : stringToUuid (
213
+ tweet . id + "-" + this . runtime . agentId
214
+ ) ,
104
215
agentId : this . runtime . agentId ,
105
216
content : {
106
217
text : tweet . text ,
218
+ source : "twitter" ,
107
219
url : tweet . permanentUrl ,
108
- source : "twitter_source" ,
109
- sourceUser : tweet . username
220
+ inReplyTo : tweet . inReplyToStatusId
221
+ ? stringToUuid (
222
+ tweet . inReplyToStatusId +
223
+ "-" +
224
+ this . runtime . agentId
225
+ )
226
+ : undefined ,
110
227
} ,
111
- roomId : stringToUuid ( "twitter-source-" + this . runtime . agentId ) ,
228
+ createdAt : tweet . timestamp * 1000 ,
229
+ roomId,
230
+ userId :
231
+ tweet . userId === this . twitterUserId
232
+ ? this . runtime . agentId
233
+ : stringToUuid ( tweet . userId ) ,
112
234
embedding : getEmbeddingZeroVector ( ) ,
113
- createdAt : tweet . timestamp ,
114
235
} ) ;
115
236
116
- elizaLogger . log ( `Processed tweet from ${ tweet . username } : ${ tweet . id } ` ) ;
237
+ elizaLogger . log ( `Processed ${ tweet . isReply ? 'reply' : 'tweet' } from ${ tweet . username } : ${ tweet . id } ` ) ;
117
238
} catch ( error ) {
118
- elizaLogger . error ( `Error processing tweet ${ tweet . id } :` , error ) ;
239
+ elizaLogger . error ( `Error processing tweet ${ tweet . id } :` , error ?. message || error ) ;
119
240
}
120
241
}
121
242
}
0 commit comments