-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_json.py
76 lines (59 loc) · 2.03 KB
/
load_json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import os
import sys
from dotenv import load_dotenv
from conn import getCQLSession
from cassandra.cqlengine.query import BatchStatement
from cassandra.query import BatchType
import csv
import json
import time
load_dotenv()
session = getCQLSession(os.environ["MODE"])
table = os.environ["DSE_TABLE"]
cmd_insert = f"""
INSERT INTO {table} (id, title, emb)
VALUES (:id, :title, :emb)
"""
prepared_stmt_insert = session.prepare(cmd_insert)
def load_jsonl(file_path, batch_size=50, skip =0):
batch = BatchStatement(batch_type=BatchType.UNLOGGED)
count = 0
start = time.time()
end = time.time()
print(f"Starting")
with open(file_path, 'r') as file:
print(f"Skiping")
for _ in range(skip):
next(file)
count = skip
print(f"Skipped {count} records")
for line in file:
try:
count += 1
row = json.loads(line.strip())
batch.add(prepared_stmt_insert, {"id": row['id'], "title": row['title'], "emb": row["titleVector"]})
if count % batch_size == 0:
rs = session.execute(batch)
batch.clear()
# if count % 1000 == 0:
# print(f"""{count} records inserted.""")
if count % 10000 == 0:
end = time.time()
print(f"Time to load: {end - start} : {count} records ( {10000 / (end - start)} recs/sec)")
start = time.time()
except json.JSONDecodeError as e:
print(f"Error decoding JSON in line: {e}")
rs = session.execute(batch)
batch.clear()
return count
def main():
filepath = sys.argv[1]
skip = int(sys.argv[2]) if len(sys.argv) > 2 else 0
print(f"Loading file {filepath}")
print(f"Skipping first {skip} lines")
start = time.time()
count = load_jsonl(filepath, 100, skip)
end = time.time()
print(f"Time to load: {end - start} : {count} records")
if __name__ == "__main__":
main()