-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.py
99 lines (76 loc) · 2.46 KB
/
stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import codecs
from tweepy import Stream
from tweepy.streaming import StreamListener
import time
import argparse
import string
import json
from get_config import get_config_keys
import sys
def get_parser():
"""Get parser for command line arguments."""
parser = argparse.ArgumentParser(description="Sominer Twitter Streamer")
parser.add_argument("-q",
"--query",
dest="query",
help="Query/Filter",
default='-')
parser.add_argument("-d",
"--data-dir",
dest="data_dir",
help="Output/Data Directory")
return parser
class Listener(StreamListener):
"""Custom StreamListener for streaming data."""
def __init__(self, data_dir, query):
query_fname = format_filename(query)
self.outfile = "%s/stream_%s.json" % (data_dir, query_fname)
def on_data(self, data):
try:
with codecs.open(self.outfile, 'a', encoding="utf-8") as f:
json_writable = json.loads(data)
f.write(json.dumps(json_writable))
f.write("\n")
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
time.sleep(5)
return True
def on_error(self, status):
print(status)
return True
def format_filename(fname):
"""Convert file name into a safe string.
Arguments:
fname -- the file name to convert
Return:
String -- converted file name
"""
return ''.join(convert_valid(one_char) for one_char in fname)
def convert_valid(one_char):
"""Convert a character into '_' if invalid.
Arguments:
one_char -- the char to convert
Return:
Character -- converted char
"""
valid_chars = "-_.%s%s" % (string.ascii_letters, string.digits)
if one_char in valid_chars:
return one_char
else:
return '_'
@classmethod
def parse(cls, api, raw):
status = cls.first_parse(api, raw)
setattr(status, 'json', json.dumps(raw))
return status
if __name__ == '__main__':
parser = get_parser()
if len(sys.argv) == 1:
print(parser.parse_args(['-h']))
args = parser.parse_args()
api, auth = get_config_keys()
twitter_stream = Stream(auth, Listener(args.data_dir, args.query))
twitter_stream.filter(track=[args.query])