-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
76 lines (65 loc) · 2.36 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import logging
import sys
import pathway as pw
import yaml
from dotenv import load_dotenv
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm import embedders, llms, parsers, splitters
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer
from pathway.xpacks.llm.vector_store import VectorStoreServer
# To use advanced features with Pathway Scale, get your free license key from
# https://pathway.com/features and paste it below.
# To use Pathway Community, comment out the line below.
# pw.set_license_key("demo-license-key-with-telemetry")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
load_dotenv()
def get_data_sources(kafka_topic):
rdkafka_settings = {
"bootstrap.servers": "localhost:9092",
"security.protocol": "plaintext",
"group.id": "0",
"session.timeout.ms": "6000",
# "sasl.username": "username",
# "sasl.password": "********",
}
source = pw.io.kafka.read(
rdkafka_settings,
topic=kafka_topic,
# schema=InputSchema,
# format="csv",
# autocommit_duration_ms=1000
)
return source
def run():
# with open(config_file) as config_f:
# configuration = yaml.safe_load(config_f)
embedder = embedders.OpenAIEmbedder(
model="text-embedding-3-small",
cache_strategy=DiskCache(),
)
chat = llms.OpenAIChat(
model="gpt-4o-mini",
retry_strategy=ExponentialBackoffRetryStrategy(max_retries=3),
cache_strategy=DiskCache(),
temperature=0.1,
)
kafka_source = get_data_sources("data2")
print("got source", flush=True)
doc_store = VectorStoreServer(
kafka_source, # pw.io.kafka.read
embedder=embedder, # embedders.OpenAIEmbedder
splitter=splitters.TokenCountSplitter(max_tokens=400),
parser=parsers.ParseUnstructured(),
)
print("starting app", flush=True)
doc_store.run_server(host="0.0.0.0", port="8090", with_cache=False, terminate_on_error=True)
print("started app", flush=True)
# rag_app = BaseRAGQuestionAnswerer(llm=chat, indexer=doc_store)
# rag_app.build_server(host="0.0.0.0", port="8090")
# rag_app.run_server(with_cache=True, terminate_on_error=False)
if __name__ == "__main__":
run()