Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stale tasks #3

Merged
merged 6 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ spl-token = "4.0.0"
itertools = "0.13"
tokio-graceful-shutdown = "0.15"
solana-transaction-utils = { version = "0.1.0", path = "./solana-transaction-utils" }
tuktuk-sdk = { version = "0.1.0", path = "./tuktuk-sdk" }
tuktuk-program = { version = "0.1.0", path = "./tuktuk-program" }
tuktuk-sdk = { version = "0.2.0", path = "./tuktuk-sdk" }
tuktuk-program = { version = "0.2.0", path = "./tuktuk-program" }
# solana-transaction-utils = { version = "0.1.0" }
# tuktuk-sdk = { version = "0.1.0" }
# tuktuk-program = { version = "0.1.0" }
Expand Down
2 changes: 1 addition & 1 deletion solana-programs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion solana-programs/programs/tuktuk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tuktuk"
version = "0.1.3"
version = "0.2.0"
description = "Created with Anchor"
edition = "2021"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct InitializeTaskQueueArgsV0 {
pub name: String,
pub capacity: u16,
pub lookup_tables: Vec<Pubkey>,
pub stale_task_age: u32,
}

pub fn hash_name(name: &str) -> [u8; 32] {
Expand Down Expand Up @@ -81,6 +82,7 @@ pub fn handler(ctx: Context<InitializeTaskQueueV0>, args: InitializeTaskQueueArg
created_at: Clock::get()?.unix_timestamp,
updated_at: Clock::get()?.unix_timestamp,
num_queue_authorities: 0,
stale_task_age: args.stale_task_age,
});
ctx.accounts
.task_queue_name_mapping
Expand Down
19 changes: 14 additions & 5 deletions solana-programs/programs/tuktuk/src/instructions/run_task_v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,19 @@ pub fn handler<'info>(
ctx: Context<'_, '_, '_, 'info, RunTaskV0<'info>>,
args: RunTaskArgsV0,
) -> Result<()> {
let now = Clock::get()?.unix_timestamp;
let task_time = match ctx.accounts.task.trigger {
TriggerV0::Now => now,
TriggerV0::Timestamp(timestamp) => timestamp,
};
ctx.accounts.task_queue.updated_at = now;
for id in args.free_task_ids.clone() {
require_gt!(
ctx.accounts.task_queue.capacity,
id,
ErrorCode::InvalidTaskId
);
}
ctx.accounts.task_queue.updated_at = Clock::get()?.unix_timestamp;
let remaining_accounts = ctx.remaining_accounts;

let transaction = match ctx.accounts.task.transaction.clone() {
Expand Down Expand Up @@ -447,11 +452,15 @@ pub fn handler<'info>(
);
}

let mut processor = TaskProcessor::new(ctx, &transaction, args.free_task_ids)?;
if now.saturating_sub(task_time) <= ctx.accounts.task_queue.stale_task_age as i64 {
let mut processor = TaskProcessor::new(ctx, &transaction, args.free_task_ids)?;

// Process each instruction
for ix in &transaction.instructions {
processor.process_instruction(ix, remaining_accounts)?;
// Process each instruction
for ix in &transaction.instructions {
processor.process_instruction(ix, remaining_accounts)?;
}
} else {
msg!("Task is stale, closing task");
}

msg!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct UpdateTaskQueueArgsV0 {
pub capacity: Option<u16>,
pub lookup_tables: Option<Vec<Pubkey>>,
pub update_authority: Option<Pubkey>,
pub stale_task_age: Option<u32>,
}

#[derive(Accounts)]
Expand Down Expand Up @@ -52,6 +53,9 @@ pub fn handler(ctx: Context<UpdateTaskQueueV0>, args: UpdateTaskQueueArgsV0) ->
if let Some(update_authority) = args.update_authority {
ctx.accounts.task_queue.update_authority = update_authority;
}
if let Some(stale_task_age) = args.stale_task_age {
ctx.accounts.task_queue.stale_task_age = stale_task_age;
}

resize_to_fit(
&ctx.accounts.payer.to_account_info(),
Expand Down
5 changes: 5 additions & 0 deletions solana-programs/programs/tuktuk/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ pub struct TaskQueueV0 {
pub name: String,
pub lookup_tables: Vec<Pubkey>,
pub num_queue_authorities: u16,
// Age before a task is considered stale and can be run/deleted without running the instructions.
// The longer this value, the more likely you have stale tasks clogging up your queue, which can cause
// the queue to be full and prevent new tasks from being added.
// The shorter this value, the more difficult it will be to debug, as failed tasks dissappear.
pub stale_task_age: u32,
}

#[macro_export]
Expand Down
14 changes: 14 additions & 0 deletions tuktuk-cli/src/cmd/task_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ pub enum Cmd {
min_crank_reward: u64,
#[arg(long, help = "Lookup tables to create")]
lookup_tables: Option<Vec<Pubkey>>,
#[arg(
long,
help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
)]
stale_task_age: u32,
},
Update {
#[command(flatten)]
Expand All @@ -55,6 +60,11 @@ pub enum Cmd {
update_authority: Option<Pubkey>,
#[arg(long)]
capacity: Option<u16>,
#[arg(
long,
help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
)]
stale_task_age: Option<u32>,
},
Get {
#[command(flatten)]
Expand Down Expand Up @@ -137,6 +147,7 @@ impl TaskQueueCmd {
min_crank_reward,
funding_amount,
lookup_tables,
stale_task_age,
} => {
let client = opts.client().await?;

Expand All @@ -148,6 +159,7 @@ impl TaskQueueCmd {
min_crank_reward: *min_crank_reward,
name: name.clone(),
lookup_tables: lookup_tables.clone().unwrap_or_default(),
stale_task_age: *stale_task_age,
},
*update_authority,
)
Expand Down Expand Up @@ -204,6 +216,7 @@ impl TaskQueueCmd {
lookup_tables,
update_authority,
capacity,
stale_task_age,
} => {
let client = opts.client().await?;
let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
Expand All @@ -220,6 +233,7 @@ impl TaskQueueCmd {
min_crank_reward: *min_crank_reward,
lookup_tables: lookup_tables.clone(),
update_authority: *update_authority,
stale_task_age: *stale_task_age,
},
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion tuktuk-crank-turner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tuktuk-crank-turner"
version = "0.1.17"
version = "0.1.18"
authors.workspace = true
edition.workspace = true
license.workspace = true
Expand Down
18 changes: 16 additions & 2 deletions tuktuk-crank-turner/src/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ impl TimedTask {
ctx.task_queue
.add_task(TimedTask {
task: self.task.clone(),
total_retries: self.total_retries + 1,
// Try again in 30 seconds with exponential backoff
total_retries: 0,
// Try again when task is stale
task_time: now + retry_delay,
task_key: self.task_key,
task_queue_key: self.task_queue_key,
Expand All @@ -318,6 +318,20 @@ impl TimedTask {
"task {:?} failed after {} retries",
self.task_key, self.max_retries
);
let task_queue = self.get_task_queue(ctx.clone()).await;
ctx.task_queue
.add_task(TimedTask {
task: self.task.clone(),
total_retries: self.total_retries + 1,
// Try again in 30 seconds with exponential backoff
task_time: self.task_time + task_queue.stale_task_age as u64,
task_key: self.task_key,
task_queue_key: self.task_queue_key,
task_queue_name: self.task_queue_name.clone(),
max_retries: self.max_retries,
in_flight_task_ids: vec![],
})
.await?;
TASKS_FAILED
.with_label_values(&[self.task_queue_name.as_str(), "RetriesExceeded"])
.inc();
Expand Down
2 changes: 1 addition & 1 deletion tuktuk-program/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tuktuk-program"
version = "0.1.0"
version = "0.2.0"
description = "Raw rust sdk for interacting with the tuktuk program"

authors.workspace = true
Expand Down
16 changes: 15 additions & 1 deletion tuktuk-program/idls/tuktuk.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"address": "tuktukUrfhXT6ZT77QTU8RQtvgL967uRuVagWF57zVA",
"metadata": {
"name": "tuktuk",
"version": "0.1.2",
"version": "0.1.3",
"spec": "0.1.0",
"description": "Created with Anchor"
},
Expand Down Expand Up @@ -652,6 +652,10 @@
"type": {
"vec": "pubkey"
}
},
{
"name": "stale_task_age",
"type": "u32"
}
]
}
Expand Down Expand Up @@ -848,6 +852,10 @@
{
"name": "num_queue_authorities",
"type": "u16"
},
{
"name": "stale_task_age",
"type": "u32"
}
]
}
Expand Down Expand Up @@ -1016,6 +1024,12 @@
"type": {
"option": "pubkey"
}
},
{
"name": "stale_task_age",
"type": {
"option": "u32"
}
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion tuktuk-sdk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tuktuk-sdk"
version = "0.1.0"
version = "0.2.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
Expand Down
Loading