Skip to content

Commit

Permalink
feat: added cli/ui to control flaked scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ymarcon committed Feb 20, 2025
1 parent 048aeda commit c2b29a2
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 373 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: Python package

on:
push:
branches: [ "main" ]
branches: [ "dev", "main" ]
pull_request:
branches: [ "main" ]

Expand Down
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ install:
poetry install

test:
poetry run pytest
poetry run pytest -s

build:
poetry build

clean:
rm -rf dist

local-install:
pip install ./dist/*.tar.gz
43 changes: 35 additions & 8 deletions flake/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typer
from logging import INFO, basicConfig, info, warning, error
from flake.services.scheduler import start
from .views.ui import ttk, FlakeUI, theme_name
from .views.scheduler import SchedulerView

# Initialise the Typer class
app = typer.Typer(
Expand All @@ -9,17 +10,43 @@
pretty_exceptions_show_locals=False,
)

default_url = "http://127.0.0.1:8000"

@app.command()
def config():
typer.echo("Config command")

def ui(url: str = default_url):
typer.echo("UI command")
app = ttk.Window(
title="Flake",
themename=theme_name,
resizable=(False, False)
)
FlakeUI(app, url=url)
app.mainloop()

@app.command()
def scheduler():
typer.echo("Scheduler command")
start()

def status(url: str = default_url):
print(SchedulerView(url).get_status())

@app.command()
def start(url: str = default_url):
print(SchedulerView(url).start())

@app.command()
def stop(url: str = default_url):
print(SchedulerView(url).stop())

@app.command()
def restart(url: str = default_url):
print(SchedulerView(url).restart())

@app.command()
def pause(url: str = default_url):
print(SchedulerView(url).pause())

@app.command()
def resume(url: str = default_url):
print(SchedulerView(url).resume())

def main() -> None:
"""The main function of the application
Expand Down
73 changes: 31 additions & 42 deletions flake/services/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,32 @@
from apscheduler.schedulers.background import BackgroundScheduler
from tenacity import retry, stop_after_attempt, wait_fixed
import logging
import time
import requests

# Configure logging
logging.basicConfig(
filename='data_processing.log',
level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s',
)

# Define the pipeline
@retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
def process_data():
try:
print("Step 1: Extracting data...")
time.sleep(1) # Simulate work
print("Step 2: Transforming data...")
time.sleep(1)
# Simulate an error
if time.time() % 2 < 1:
raise ValueError("Random simulated error!")
print("Step 3: Loading data...")
time.sleep(1)
print("Pipeline finished successfully.")
except Exception as e:
logging.error("Pipeline failed", exc_info=True)
raise

# Schedule the pipeline
def start():
scheduler = BackgroundScheduler()
scheduler.add_job(process_data, 'interval', minutes=1) # Run every minute
scheduler.start()

print("Scheduler started. Press Ctrl+C to exit.")
try:
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
print("Scheduler stopped.")
class SchedulerService:

def __init__(self, url: str):
self.url = url


def get_status(self):
# request GET /scheduler/status
resp = requests.get(f"{self.url}/scheduler/status")
return resp.json()

def start(self):
# request PUT /scheduler?action=start
resp = requests.put(f"{self.url}/scheduler/status", params={"action": "start"})
return resp.json()

def stop(self):
# request PUT /scheduler?action=stop
resp = requests.put(f"{self.url}/scheduler/status", params={"action": "stop"})
return resp.json()

def pause(self):
# request PUT /scheduler?action=pause
resp = requests.put(f"{self.url}/scheduler/status", params={"action": "pause"})
return resp.json()

def resume(self):
# request PUT /scheduler?action=resume
resp = requests.put(f"{self.url}/scheduler/status", params={"action": "resume"})
return resp.json()
25 changes: 25 additions & 0 deletions flake/views/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from ..services.scheduler import SchedulerService

class SchedulerView:

def __init__(self, url: str):
self.service = SchedulerService(url)

def get_status(self):
return self.service.get_status()

def start(self):
return self.service.start()

def stop(self):
return self.service.stop()

def restart(self):
self.stop()
return self.start()

def pause(self):
return self.service.pause()

def resume(self):
return self.service.resume()
148 changes: 148 additions & 0 deletions flake/views/ui.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import ttkbootstrap as ttk
from ttkbootstrap.style import Style
from ttkbootstrap.constants import *
from ..services.scheduler import SchedulerService

theme_name = "flatly"

class ColorCircle:
def __init__(self, parent, size):
self.canvas = ttk.Canvas(
parent,
width=size,
height=size,
background="white",
highlightthickness=0
)
# Create the circle and store its id
self.circle_id = self.canvas.create_oval(
0, 0, size, size,
fill="white", # default color
outline=""
)

def set_color(self, color):
# Update circle color
self.canvas.itemconfig(self.circle_id, fill=color)

def pack(self, *args, **kwargs):
self.canvas.pack(*args, **kwargs)

class FlakeUI(ttk.Frame):

def __init__(self, master, url: str):
super().__init__(master)
self.url = url
self.scheduler = SchedulerService(url)
self.pack(fill=BOTH, expand=YES)
self.status_text = ttk.StringVar(value=self.scheduler.get_status()["status"])

self.create_status_widgets()
self.create_status_controls()
self.update_status_display()

def is_running(self):
return self.status_text.get() == "running"

def is_stopped(self):
return self.status_text.get() == "stopped"

def is_paused(self):
return self.status_text.get() == "paused"

def create_status_widgets(self):
"""Create the status display"""
container = ttk.Frame(self, padding=10)
container.pack(fill=X)
self.circle = ColorCircle(container, 50)
self.circle.pack(side=LEFT, padx=20)
lbl = ttk.Label(
master=container,
font="-size 16",
#anchor=CENTER,
textvariable=self.status_text,
)
lbl.pack(side=TOP, fill=X, padx=20, pady=20)

def create_status_controls(self):
"""Create the control frame with buttons"""
container = ttk.Frame(self, padding=10)
container.pack(fill=X)
self.buttons = []
self.buttons.append(
ttk.Button(
master=container,
text= "?",
width=10,
bootstyle=INFO,
command=self.on_toggle,
)
)
self.buttons.append(
ttk.Button(
master=container,
text="Restart",
width=10,
bootstyle=SUCCESS,
command=self.on_restart,
)
)
self.buttons.append(
ttk.Button(
master=container,
text="Stop",
width=10,
bootstyle=DANGER,
command=self.on_stop,
)
)
for button in self.buttons:
button.pack(side=LEFT, fill=X, expand=YES, pady=0, padx=5)

def on_toggle(self):
"""Toggle the start and pause button."""
if self.is_running():
self.on_pause()
elif self.is_paused():
self.on_resume()
else:
self.on_start()

def update_status_display(self):
button = self.buttons[0]
if self.is_paused():
button.configure(bootstyle=INFO, text="Resume")
self.circle.set_color("orange")
elif self.is_running():
button.configure(bootstyle=INFO, text="Pause")
self.circle.set_color("green")
else:
button.configure(bootstyle=INFO, text="Start")
self.circle.set_color("red")

def on_start(self):
resp = self.scheduler.start()
self.status_text.set(resp["status"])
self.update_status_display()

def on_stop(self):
resp = self.scheduler.stop()
self.status_text.set(resp["status"])
self.update_status_display()

def on_restart(self):
resp = self.scheduler.stop()
self.status_text.set(resp["status"])
resp = self.scheduler.start()
self.status_text.set(resp["status"])
self.update_status_display()

def on_pause(self):
resp = self.scheduler.pause()
self.status_text.set(resp["status"])
self.update_status_display()

def on_resume(self):
resp = self.scheduler.resume()
self.status_text.set(resp["status"])
self.update_status_display()
Loading

0 comments on commit c2b29a2

Please sign in to comment.