Skip to content

Commit

Permalink
feat: add orchestrator logic
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanAlvarez21 committed Feb 7, 2025
1 parent b88ab0e commit 80121fc
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 0 deletions.
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,57 @@ CREATE TABLE api_data(
);
```

## Orchestrator: Manager for Scheduling and Running Scripts ##

This module manages the execution of scheduled bots based on time intervals, specific times, or days. It uses a **ThreadPool** to run the scripts concurrently, ensuring efficient execution.

### Adding Schedules to Your Repository

To use the Orchestrator for scheduling and executing your bots, you need to define a `schedules` array in your repository. This array should include the paths to your scripts and the schedules for execution.

### Example of the `schedules` defintion

```ruby
BIRTHDAY_SCHEDULES = [
# Execute every 1000ms (1 second)
{ path: '/birthday/fetch_birthday_from_notion.rb', interval: 1000 },
{ path: '/birthday/format_birthday.rb', interval: 1000 },
{ path: '/birthday/notify_birthday_in_discord.rb', interval: 1000 },
{ path: '/birthday/garbage_collector.rb', interval: 1000 },
].freeze

# With days and hours
# Execute at 08:00 AM on Mondays
{ path: '/birthday/notify_birthday_in_email.rb', day: ['Monday'], time: ['08:00'] }

```

### How to Use the Orchestrator

Once you've defined your schedules, you can initialize and run the Orchestrator to begin executing your scripts based on their schedules.


```ruby

# Initialize the orchestrator with the defined schedules
manager = Bas::Orchestrator::Manager.new(BIRTHDAY_SCHEDULES)

# Run the orchestrator
manager.run

```
### Folder structure example:

```bash
src/use_cases_execution/
├── birthday/
│ ├── fetch_birthday_from_notion.rb
│ ├── format_birthday.rb
│ ├── notify_birthday_in_discord.rb
│ └── garbage_collector.rb
└── schedules.rb
```

### Implementation examples

#### Example 1: Using the Same Shared Storage for Reading and Writing
Expand Down
18 changes: 18 additions & 0 deletions lib/bas/orchestrator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

require "bas/orchestrator/manager"
module Bas
# The Orchestrator module is responsible for managing the scheduling and execution
# of scripts within the business automation services. It provides a high-level
# interface to start the orchestration process using the `Manager` class.
#
module Orchestrator
# Starts the orchestration process with the given schedules.
#
# @param schedules [Array<Hash>] A list of scripts with execution details.
def self.start(schedules)
manager = Manager.new(schedules)
manager.run
end
end
end
92 changes: 92 additions & 0 deletions lib/bas/orchestrator/manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# frozen_string_literal: true

require "concurrent-ruby"

module Bas
module Orchestrator
##
# Manager class responsible for scheduling and executing scripts concurrently.
#
# This class initializes a thread pool and processes scheduled scripts based on
# time intervals, specific days, or exact times.
#
class Manager
def initialize(schedules)
@last_executions = Hash.new(0.0)
@schedules = schedules
@pool = Concurrent::FixedThreadPool.new(@schedules.size)
end

def run
@schedules.each { |script| @pool.post { process_script(script) } }

@pool.shutdown
@pool.wait_for_termination
end

private

def process_script(script)
loop do
@actual_time = Time.new

execute_interval(script) if interval?(script)
execute_day(script) if day?(script) && time?(script)
execute_time(script) if time?(script) && !day?(script)

sleep 0.1
rescue StandardError => e
puts "Error in thread: #{e.message}"
end
end

def execute_interval(script)
return unless time_in_milliseconds - @last_executions[script[:path]] >= script[:interval]

execute(script)
@last_executions[script[:path]] = time_in_milliseconds
end

def execute_day(script)
return unless script[:day].include?(current_day) && script[:time].include?(current_time)

execute(script) unless @last_executions[script[:path]].eql?(current_time)
@last_executions[script[:path]] = current_time
end

def execute_time(script)
execute(script) if script[:time].include?(current_time) && !@last_executions[script[:path]].eql?(current_time)
@last_executions[script[:path]] = current_time
end

def interval?(script)
script[:interval]
end

def time?(script)
script[:time]
end

def day?(script)
script[:day]
end

def time_in_milliseconds
@actual_time.to_f * 1000
end

def current_time
@actual_time.strftime("%H:%M")
end

def current_day
@actual_time.strftime("%A")
end

def execute(script)
puts "Executing #{script[:path]} at #{current_time}"
system("ruby ", script[:path])
end
end
end
end
80 changes: 80 additions & 0 deletions spec/bas/orchestrator/manager_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# frozen_string_literal: true

require "spec_helper"
require "bas/orchestrator/manager"

RSpec.describe Bas::Orchestrator::Manager do
let(:schedules) do
[
{ path: "websites_availability/fetch_domain_services_from_notion.rb", interval: 600_000 },
{ path: "websites_availability/notify_domain_availability.rb", interval: 60_000 },
{ path: "websites_availability/garbage_collector.rb", time: ["00:00"] },
{ path: "pto_next_week/fetch_next_week_pto_from_notion.rb", time: ["12:40"], day: ["Monday"] },
{ path: "pto/fetch_pto_from_notion.rb", time: ["13:10"] }
]
end

let(:manager) { described_class.new(schedules) }

before do
allow(manager).to receive(:current_time).and_return("12:40")
allow(manager).to receive(:current_day).and_return("Monday")
allow(manager).to receive(:time_in_milliseconds).and_return(10_000)
allow(manager).to receive(:system).and_return(true)
end

describe "#execute_interval" do
it "executes scripts when interval has elapsed" do
script = schedules[0]
manager.instance_variable_set(:@last_executions, { script[:path] => 0 })
allow(manager).to receive(:time_in_milliseconds).and_return(600_000)

expect { manager.send(:execute_interval, script) }.to(change do
manager.instance_variable_get(:@last_executions)[script[:path]]
end)
end

it "does not execute script if interval has not elapsed" do
script = schedules[0]
manager.instance_variable_set(:@last_executions, { script[:path] => 0 })
allow(manager).to receive(:time_in_milliseconds).and_return(10_000)

expect { manager.send(:execute_interval, script) }.not_to(change do
manager.instance_variable_get(:@last_executions)[script[:path]]
end)
end
end

describe "#execute_time" do
it "executes scripts at exact time" do
script = schedules[2]
allow(manager).to receive(:current_time).and_return("00:00")

expect { manager.send(:execute_time, script) }.to(change do
manager.instance_variable_get(:@last_executions)[script[:path]]
end)
end
end

describe "#execute_day" do
it "executes scripts at specific time and day" do
script = schedules[3]
allow(manager).to receive(:current_time).and_return("12:40")
allow(manager).to receive(:current_day).and_return("Monday")

expect { manager.send(:execute_day, script) }.to(change do
manager.instance_variable_get(:@last_executions)[script[:path]]
end)
end

it "does not execute script if time is correct but the day is incorrect" do
script = schedules[3]
allow(manager).to receive(:current_time).and_return("12:40")
allow(manager).to receive(:current_day).and_return("Tuesday")

expect { manager.send(:execute_day, script) }.not_to(change do
manager.instance_variable_get(:@last_executions)[script[:path]]
end)
end
end
end
29 changes: 29 additions & 0 deletions spec/bas/orchestrator_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

require "spec_helper"
require "bas/orchestrator"

RSpec.describe Bas::Orchestrator do
let(:schedules) do
[
{ path: "websites_availability/fetch_domain_services_from_notion.rb", interval: 600_000 },
{ path: "websites_availability/notify_domain_availability.rb", interval: 60_000 },
{ path: "websites_availability/garbage_collector.rb", time: ["00:00"] },
{ path: "pto_next_week/fetch_next_week_pto_from_notion.rb", time: ["12:40"], day: ["Monday"] },
{ path: "pto/fetch_pto_from_notion.rb", time: ["13:10"] }
]
end

let(:manager) { instance_double(Bas::Orchestrator::Manager, run: true) }

before do
allow(Bas::Orchestrator::Manager).to receive(:new).with(schedules).and_return(manager)
end

describe ".start" do
it "initializes and runs the manager" do
expect(manager).to receive(:run)
described_class.start(schedules)
end
end
end

0 comments on commit 80121fc

Please sign in to comment.