-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconn.py
66 lines (58 loc) · 2.19 KB
/
conn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Astra Connection (Based on cqlsession, from cassio library)
import os
from cassandra.cluster import (
Cluster,
)
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import WhiteListRoundRobinPolicy, DowngradingConsistencyRetryPolicy
from cassandra.query import tuple_factory
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import AddressTranslator
class StaticTranslator(AddressTranslator):
"""
Returns the endpoint with no translation
"""
def translate(self, addr):
return "127.0.0.1"
def getCQLSession(mode='astra_db'):
print('Initializing CQL Session')
if mode == 'astra_db':
cluster = Cluster(
cloud={
"secure_connect_bundle": os.environ["ASTRA_DB_SECURE_BUNDLE_PATH"],
},
auth_provider=PlainTextAuthProvider(
os.environ["ASTRA_DB_CLIENT_ID"],
os.environ["ASTRA_DB_CLIENT_SECRET"],
),
)
astraSession = cluster.connect()
astraSession.row_factory = dict_factory
print('Connected')
return astraSession
elif mode == 'dse':
print(os.environ["DSE_NODES"])
print(os.environ["DSE_USER"])
print(os.environ["DSE_PASS"])
profile = ExecutionProfile(
load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']),
request_timeout=15,
)
address_translator = StaticTranslator()
auth_provider = PlainTextAuthProvider(
username=os.environ["DSE_USER"], password=os.environ['DSE_PASS'])
cluster = Cluster(os.environ["DSE_NODES"].split(','),
address_translator=address_translator,
connect_timeout=30,
auth_provider=auth_provider)
dseSession = cluster.connect()
return dseSession
elif mode == 'local':
cluster = Cluster()
localSession = cluster.connect()
return localSession
else:
raise ValueError('Unknown CQL Session mode')