Skip to content

Commit

Permalink
Merge branch 'main' into bodobolero/rename_duckdblib
Browse files Browse the repository at this point in the history
  • Loading branch information
Bodobolero authored Feb 23, 2025
2 parents 224ef56 + df26438 commit 9a42847
Show file tree
Hide file tree
Showing 15 changed files with 853 additions and 342 deletions.
57 changes: 41 additions & 16 deletions compute_tools/src/spec_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::sync::Arc;

use crate::compute::construct_superuser_query;
use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
use anyhow::{bail, Result};
use anyhow::Result;
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use tracing::{debug, info_span, Instrument};
use tracing::{debug, info_span, warn, Instrument};

#[derive(Clone)]
pub enum DB {
Expand Down Expand Up @@ -47,6 +47,11 @@ pub enum PerDatabasePhase {
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
/// This is a shared phase, used for both i) dropping dangling LR subscriptions
/// before dropping the DB, and ii) dropping all subscriptions after creating
/// a fresh branch.
/// N.B. we will skip all DBs that are not present in Postgres, invalid, or
/// have `datallowconn = false` (`restrict_conn`).
DropLogicalSubscriptions,
}

Expand Down Expand Up @@ -168,7 +173,7 @@ where
///
/// In the future we may generate a single stream of changes and then
/// sort/merge/batch execution, but for now this is a nice way to improve
/// batching behaviour of the commands.
/// batching behavior of the commands.
async fn get_operations<'a>(
spec: &'a ComputeSpec,
ctx: &'a RwLock<MutableApplyContext>,
Expand Down Expand Up @@ -451,6 +456,38 @@ async fn get_operations<'a>(
)),
}))),
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
// Do some checks that user DB exists and we can access it.
//
// During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
// which happen before dropping the DB, the current run could be a retry,
// so it's a valid case when DB is absent already. The case of
// `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
// in theory user can have some dangling objects there, so we will fail at
// the actual drop later. Yet, to fix that in the current code we would need
// to ALTER DATABASE, and then check back, but that even more invasive, so
// that's not what we really want to do here.
//
// For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
if let DB::UserDB(db) = db {
let databases = &ctx.read().await.dbs;

let edb = match databases.get(&db.name) {
Some(edb) => edb,
None => {
warn!("skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL", subphase, db.name);
return Ok(Box::new(empty()));
}
};

if edb.restrict_conn || edb.invalid {
warn!(
"skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
subphase, db.name, edb.restrict_conn, edb.invalid
);
return Ok(Box::new(empty()));
}
}

match subphase {
PerDatabasePhase::DropLogicalSubscriptions => {
match &db {
Expand Down Expand Up @@ -530,25 +567,12 @@ async fn get_operations<'a>(
Ok(Box::new(operations))
}
PerDatabasePhase::ChangeSchemaPerms => {
let ctx = ctx.read().await;
let databases = &ctx.dbs;

let db = match &db {
// ignore schema permissions on the system database
DB::SystemDB => return Ok(Box::new(empty())),
DB::UserDB(db) => db,
};

if databases.get(&db.name).is_none() {
bail!("database {} doesn't exist in PostgreSQL", db.name);
}

let edb = databases.get(&db.name).unwrap();

if edb.restrict_conn || edb.invalid {
return Ok(Box::new(empty()));
}

let operations = vec![
Operation {
query: format!(
Expand All @@ -566,6 +590,7 @@ async fn get_operations<'a>(

Ok(Box::new(operations))
}
// TODO: remove this completely https://github.com/neondatabase/cloud/issues/22663
PerDatabasePhase::HandleAnonExtension => {
// Only install Anon into user databases
let db = match &db {
Expand Down
2 changes: 1 addition & 1 deletion libs/vm_monitor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "vm_monitor"
version = "0.1.0"
edition.workspace = true
edition = "2024"
license.workspace = true

[[bin]]
Expand Down
10 changes: 4 additions & 6 deletions libs/vm_monitor/src/cgroup.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::fmt::{self, Debug, Formatter};
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context};
use cgroups_rs::{
hierarchies::{self, is_cgroup2_unified_mode},
memory::MemController,
Subsystem,
};
use anyhow::{Context, anyhow};
use cgroups_rs::Subsystem;
use cgroups_rs::hierarchies::{self, is_cgroup2_unified_mode};
use cgroups_rs::memory::MemController;
use tokio::sync::watch;
use tracing::{info, warn};

Expand Down
12 changes: 5 additions & 7 deletions libs/vm_monitor/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
//! the cgroup (requesting upscale), and the signals that go to the cgroup
//! (notifying it of upscale).
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use axum::extract::ws::{Message, Utf8Bytes, WebSocket};
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use tracing::{debug, info};

use crate::protocol::{
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
OutboundMsg, OutboundMsgKind, PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION, ProtocolRange,
ProtocolResponse, ProtocolVersion,
};

/// The central handler for all communications in the monitor.
Expand Down
8 changes: 5 additions & 3 deletions libs/vm_monitor/src/filecache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
use std::num::NonZeroU64;

use crate::MiB;
use anyhow::{anyhow, Context};
use tokio_postgres::{types::ToSql, Client, NoTls, Row};
use anyhow::{Context, anyhow};
use tokio_postgres::types::ToSql;
use tokio_postgres::{Client, NoTls, Row};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use crate::MiB;

/// Manages Postgres' file cache by keeping a connection open.
#[derive(Debug)]
pub struct FileCacheState {
Expand Down
22 changes: 12 additions & 10 deletions libs/vm_monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@
#![deny(clippy::undocumented_unsafe_blocks)]
#![cfg(target_os = "linux")]

use std::fmt::Debug;
use std::net::SocketAddr;
use std::time::Duration;

use anyhow::Context;
use axum::{
extract::{ws::WebSocket, State, WebSocketUpgrade},
response::Response,
};
use axum::{routing::get, Router};
use axum::Router;
use axum::extract::ws::WebSocket;
use axum::extract::{State, WebSocketUpgrade};
use axum::response::Response;
use axum::routing::get;
use clap::Parser;
use futures::Future;
use std::net::SocketAddr;
use std::{fmt::Debug, time::Duration};
use runner::Runner;
use sysinfo::{RefreshKind, System, SystemExt};
use tokio::net::TcpListener;
use tokio::{sync::broadcast, task::JoinHandle};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use runner::Runner;

// Code that interfaces with agent
pub mod dispatcher;
pub mod protocol;
Expand Down
3 changes: 2 additions & 1 deletion libs/vm_monitor/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
use core::fmt;
use std::cmp;

use serde::{de::Error, Deserialize, Serialize};
use serde::de::Error;
use serde::{Deserialize, Serialize};

/// A Message we send to the agent.
#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
8 changes: 5 additions & 3 deletions libs/vm_monitor/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::fmt::Debug;
use std::time::{Duration, Instant};

use anyhow::{bail, Context};
use anyhow::{Context, bail};
use axum::extract::ws::{Message, WebSocket};
use futures::StreamExt;
use tokio::sync::{broadcast, watch};
Expand All @@ -18,7 +18,7 @@ use crate::cgroup::{self, CgroupWatcher};
use crate::dispatcher::Dispatcher;
use crate::filecache::{FileCacheConfig, FileCacheState};
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
use crate::{Args, MiB, bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel};

/// Central struct that interacts with agent, dispatcher, and cgroup to handle
/// signals from the agent.
Expand Down Expand Up @@ -233,7 +233,9 @@ impl Runner {
//
// TODO: make the duration here configurable.
if last_time.elapsed() > Duration::from_secs(5) {
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
bail!(
"haven't gotten cgroup memory stats recently enough to determine downscaling information"
);
} else if last_history.samples_count <= 1 {
let status = "haven't received enough cgroup memory stats yet";
info!(status, "discontinuing downscale");
Expand Down
Loading

0 comments on commit 9a42847

Please sign in to comment.