Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Feb 19, 2025
1 parent 7a75978 commit 8a89615
Show file tree
Hide file tree
Showing 59 changed files with 323 additions and 284 deletions.
4 changes: 2 additions & 2 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ message DataType {
SERIAL = 19;
INT256 = 20;
MAP = 21;
TIMESTAMP_NANOSECOND = 22;
Timestamp_ns = 22;
}
TypeName type_name = 1;
// Data length for char.
Expand Down Expand Up @@ -105,7 +105,7 @@ enum ArrayType {
SERIAL = 17;
INT256 = 18;
MAP = 20;
TIMESTAMP_NANOSECOND = 21;
Timestamp_ns = 21;
}

message Array {
Expand Down
1 change: 0 additions & 1 deletion src/batch/executors/src/executor/postgres_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ fn postgres_cell_to_scalar_impl(
| DataType::Date
| DataType::Time
| DataType::Timestamp
| DataType::TimestampNanosecond
| DataType::Timestamptz
| DataType::Jsonb
| DataType::Interval
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/array/arrow/arrow_iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ToArrow for IcebergArrowConvert {
DataType::Date => self.date_type_to_arrow(),
DataType::Time => self.time_type_to_arrow(),
DataType::Timestamp => self.timestamp_type_to_arrow(),
DataType::TimestampNanosecond => self.timestampns_type_to_arrow(),
DataType::TimestampNs => self.timestamp_ns_type_to_arrow(),
DataType::Timestamptz => self.timestamptz_type_to_arrow(),
DataType::Interval => self.interval_type_to_arrow(),
DataType::Varchar => self.varchar_type_to_arrow(),
Expand Down Expand Up @@ -247,7 +247,7 @@ impl ToArrow for IcebergCreateTableArrowConvert {
DataType::Date => self.date_type_to_arrow(),
DataType::Time => self.time_type_to_arrow(),
DataType::Timestamp => self.timestamp_type_to_arrow(),
DataType::TimestampNanosecond => self.timestampns_type_to_arrow(),
DataType::TimestampNs => self.timestamp_ns_type_to_arrow(),
DataType::Timestamptz => self.timestamptz_type_to_arrow(),
DataType::Interval => self.interval_type_to_arrow(),
DataType::Varchar => self.varchar_type_to_arrow(),
Expand Down
42 changes: 20 additions & 22 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub trait ToArrow {
ArrayImpl::Date(array) => self.date_to_arrow(array),
ArrayImpl::Time(array) => self.time_to_arrow(array),
ArrayImpl::Timestamp(array) => self.timestamp_to_arrow(array),
ArrayImpl::TimestampNanosecond(array) => self.timestampns_to_arrow(array),
ArrayImpl::TimestampNs(array) => self.timestamp_ns_to_arrow(array),
ArrayImpl::Timestamptz(array) => self.timestamptz_to_arrow(array),
ArrayImpl::Interval(array) => self.interval_to_arrow(array),
ArrayImpl::Utf8(array) => self.utf8_to_arrow(array),
Expand Down Expand Up @@ -182,9 +182,9 @@ pub trait ToArrow {
}

#[inline]
fn timestampns_to_arrow(
fn timestamp_ns_to_arrow(
&self,
array: &TimestampNanosecondArray,
array: &TimestampNsArray,
) -> Result<arrow_array::ArrayRef, ArrayError> {
Ok(Arc::new(arrow_array::TimestampNanosecondArray::from(array)))
}
Expand Down Expand Up @@ -328,7 +328,7 @@ pub trait ToArrow {
DataType::Date => self.date_type_to_arrow(),
DataType::Time => self.time_type_to_arrow(),
DataType::Timestamp => self.timestamp_type_to_arrow(),
DataType::TimestampNanosecond => self.timestampns_type_to_arrow(),
DataType::TimestampNs => self.timestamp_ns_type_to_arrow(),
DataType::Timestamptz => self.timestamptz_type_to_arrow(),
DataType::Interval => self.interval_type_to_arrow(),
DataType::Varchar => self.varchar_type_to_arrow(),
Expand Down Expand Up @@ -393,7 +393,7 @@ pub trait ToArrow {
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
}
#[inline]
fn timestampns_type_to_arrow(&self) -> arrow_schema::DataType {
fn timestamp_ns_type_to_arrow(&self) -> arrow_schema::DataType {
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
}

Expand Down Expand Up @@ -537,7 +537,7 @@ pub trait FromArrow {
Timestamp(Second, Some(_)) => DataType::Timestamptz,
Timestamp(Millisecond, None) => DataType::Timestamp,
Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
Timestamp(Nanosecond, None) => DataType::TimestampNanosecond,
Timestamp(Nanosecond, None) => DataType::TimestampNs,
Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
Interval(MonthDayNano) => DataType::Interval,
Utf8 => DataType::Varchar,
Expand Down Expand Up @@ -636,10 +636,10 @@ pub trait FromArrow {
self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, None) => {
self.from_timestampns_array(array.as_any().downcast_ref().unwrap())
self.from_timestamp_ns_array(array.as_any().downcast_ref().unwrap())
}
Timestamp(Nanosecond, Some(_)) => {
self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap())
self.from_timestamp_ns_some_array(array.as_any().downcast_ref().unwrap())
}
Interval(MonthDayNano) => {
self.from_interval_array(array.as_any().downcast_ref().unwrap())
Expand Down Expand Up @@ -811,14 +811,14 @@ pub trait FromArrow {
Ok(ArrayImpl::Timestamptz(array.into()))
}

fn from_timestampns_array(
fn from_timestamp_ns_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::TimestampNanosecond(array.into()))
Ok(ArrayImpl::TimestampNs(array.into()))
}

fn from_timestampns_some_array(
fn from_timestamp_ns_some_array(
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Expand Down Expand Up @@ -1060,10 +1060,10 @@ converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);

converts_with_timeunit!(TimestampNanosecondArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
converts_with_timeunit!(TimestampNanosecondArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
converts_with_timeunit!(TimestampNanosecondArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
converts_with_timeunit!(TimestampNanosecondArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);
converts_with_timeunit!(TimestampNsArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map);
converts_with_timeunit!(TimestampNsArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map);
converts_with_timeunit!(TimestampNsArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map);
converts_with_timeunit!(TimestampNsArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map);

/// Converts RisingWave value from and into Arrow value.
trait FromIntoArrow {
Expand Down Expand Up @@ -1181,24 +1181,22 @@ impl FromIntoArrowWithUnit for Timestamp {
}
}

impl FromIntoArrowWithUnit for TimestampNanosecond {
impl FromIntoArrowWithUnit for TimestampNs {
type ArrowType = i64;
type TimestampType = TimeUnit;

fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self {
match time_unit {
TimeUnit::Second => {
TimestampNanosecond(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
TimestampNs(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc())
}
TimeUnit::Millisecond => {
TimestampNanosecond(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
TimestampNs(DateTime::from_timestamp_millis(value).unwrap().naive_utc())
}
TimeUnit::Microsecond => {
TimestampNanosecond(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
}
TimeUnit::Nanosecond => {
TimestampNanosecond(DateTime::from_timestamp_nanos(value).naive_utc())
TimestampNs(DateTime::from_timestamp_micros(value).unwrap().naive_utc())
}
TimeUnit::Nanosecond => TimestampNs(DateTime::from_timestamp_nanos(value).naive_utc()),
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/src/array/chrono_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{PrimitiveArray, PrimitiveArrayBuilder, TimestampNanosecond};
use super::{PrimitiveArray, PrimitiveArrayBuilder, TimestampNs};
use crate::types::{Date, Time, Timestamp, Timestamptz};

pub type DateArray = PrimitiveArray<Date>;
pub type TimeArray = PrimitiveArray<Time>;
pub type TimestampArray = PrimitiveArray<Timestamp>;
pub type TimestampNanosecondArray = PrimitiveArray<TimestampNanosecond>;
pub type TimestampNsArray = PrimitiveArray<TimestampNs>;
pub type TimestamptzArray = PrimitiveArray<Timestamptz>;

pub type DateArrayBuilder = PrimitiveArrayBuilder<Date>;
pub type TimeArrayBuilder = PrimitiveArrayBuilder<Time>;
pub type TimestampArrayBuilder = PrimitiveArrayBuilder<Timestamp>;
pub type TimestampNanosecondArrayBuilder = PrimitiveArrayBuilder<TimestampNanosecond>;
pub type TimestampNsArrayBuilder = PrimitiveArrayBuilder<TimestampNs>;
pub type TimestamptzArrayBuilder = PrimitiveArrayBuilder<Timestamptz>;

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub use bool_array::{BoolArray, BoolArrayBuilder};
pub use bytes_array::*;
pub use chrono_array::{
DateArray, DateArrayBuilder, TimeArray, TimeArrayBuilder, TimestampArray,
TimestampArrayBuilder, TimestampNanosecondArray, TimestampNanosecondArrayBuilder,
TimestamptzArray, TimestamptzArrayBuilder,
TimestampArrayBuilder, TimestampNsArray, TimestampNsArrayBuilder, TimestamptzArray,
TimestamptzArrayBuilder,
};
pub use data_chunk::{DataChunk, DataChunkTestExt};
pub use data_chunk_iter::RowRef;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl_primitive_for_others! {
{ Date, Date, Date },
{ Time, Time, Time },
{ Timestamp, Timestamp, Timestamp },
{ TimestampNanosecond, TimestampNanosecond, TimestampNanosecond },
{ TimestampNs, TimestampNs, TimestampNs },
{ Timestamptz, Timestamptz, Timestamptz }
}

Expand Down
4 changes: 1 addition & 3 deletions src/common/src/array/proto_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ impl ArrayImpl {
PbArrayType::Date => read_primitive_array::<Date>(array, cardinality)?,
PbArrayType::Time => read_primitive_array::<Time>(array, cardinality)?,
PbArrayType::Timestamp => read_primitive_array::<Timestamp>(array, cardinality)?,
PbArrayType::TimestampNanosecond => {
read_primitive_array::<TimestampNanosecond>(array, cardinality)?
}
PbArrayType::TimestampNs => read_primitive_array::<TimestampNs>(array, cardinality)?,
PbArrayType::Timestamptz => read_primitive_array::<Timestamptz>(array, cardinality)?,
PbArrayType::Interval => read_primitive_array::<Interval>(array, cardinality)?,
PbArrayType::Jsonb => JsonbArray::from_protobuf(array)?,
Expand Down
8 changes: 4 additions & 4 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use static_assertions::const_assert_eq;
use crate::array::{ListValue, MapValue, StructValue};
use crate::types::{
DataType, Date, Decimal, Int256, Int256Ref, JsonbVal, Scalar, ScalarRef, ScalarRefImpl, Serial,
Time, Timestamp, TimestampNanosecond, Timestamptz, F32, F64,
Time, Timestamp, TimestampNs, Timestamptz, F32, F64,
};
use crate::util::hash_util::{Crc32FastBuilder, XxHash64Builder};
use crate::util::sort_util::OrderType;
Expand Down Expand Up @@ -584,7 +584,7 @@ impl HashKeyDe for Timestamp {
}
}

impl HashKeySer<'_> for TimestampNanosecond {
impl HashKeySer<'_> for TimestampNs {
fn serialize_into(self, mut buf: impl BufMut) {
buf.put_i64_ne(self.0.and_utc().timestamp());
buf.put_u32_ne(self.0.and_utc().timestamp_subsec_nanos());
Expand All @@ -595,11 +595,11 @@ impl HashKeySer<'_> for TimestampNanosecond {
}
}

impl HashKeyDe for TimestampNanosecond {
impl HashKeyDe for TimestampNs {
fn deserialize(_data_type: &DataType, mut buf: impl Buf) -> Self {
let secs = buf.get_i64_ne();
let nsecs = buf.get_u32_ne();
TimestampNanosecond::with_secs_nsecs(secs, nsecs).unwrap()
TimestampNs::with_secs_nsecs(secs, nsecs).unwrap()
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/common/src/row/owned_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use risingwave_common_estimate_size::EstimateSize;

use super::Row;
use crate::types::{
DataType, Date, Datum, DatumRef, Decimal, Interval, ScalarImpl, Time, Timestamp,
TimestampNanosecond, ToDatumRef,
DataType, Date, Datum, DatumRef, Decimal, Interval, ScalarImpl, Time, Timestamp, TimestampNs,
ToDatumRef,
};
use crate::util::iter_util::ZipEqDebug;
use crate::util::value_encoding;
Expand Down Expand Up @@ -82,9 +82,7 @@ impl OwnedRow {
DataType::Date => x.parse::<Date>().unwrap().into(),
DataType::Time => x.parse::<Time>().unwrap().into(),
DataType::Timestamp => x.parse::<Timestamp>().unwrap().into(),
DataType::TimestampNanosecond => {
x.parse::<TimestampNanosecond>().unwrap().into()
}
DataType::TimestampNs => x.parse::<TimestampNs>().unwrap().into(),
DataType::Interval => x.parse::<Interval>().unwrap().into(),
DataType::Decimal => x.parse::<Decimal>().unwrap().into(),
_ => todo!(),
Expand Down
6 changes: 3 additions & 3 deletions src/common/src/test_utils/rand_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use rand::{Rng, SeedableRng};
use crate::array::{Array, ArrayBuilder, ArrayRef, ListValue, MapValue, StructValue};
use crate::types::{
DataType, Date, Decimal, Int256, Interval, JsonbVal, MapType, NativeType, Scalar, Serial, Time,
Timestamp, TimestampNanosecond, Timestamptz,
Timestamp, TimestampNs, Timestamptz,
};

pub trait RandValue {
Expand Down Expand Up @@ -106,9 +106,9 @@ impl RandValue for Timestamp {
}
}

impl RandValue for TimestampNanosecond {
impl RandValue for TimestampNs {
fn rand_value<R: Rng>(rand: &mut R) -> Self {
TimestampNanosecond::new(Date::rand_value(rand).0.and_time(Time::rand_value(rand).0))
TimestampNs::new(Date::rand_value(rand).0.and_time(Time::rand_value(rand).0))
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/common/src/test_utils/rand_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use crate::array::{
BoolArray, DataChunk, DateArray, DecimalArray, F32Array, F64Array, I16Array, I32Array,
I64Array, Int256Array, IntervalArray, SerialArray, TimeArray, TimestampArray,
TimestampNanosecondArray, TimestamptzArray, Utf8Array,
I64Array, Int256Array, IntervalArray, SerialArray, TimeArray, TimestampArray, TimestampNsArray,
TimestamptzArray, Utf8Array,
};
use crate::test_utils::rand_array::seed_rand_array_ref;
use crate::types::DataType;
Expand All @@ -38,8 +38,8 @@ pub fn gen_chunk(data_types: &[DataType], size: usize, seed: u64, null_ratio: f6
DataType::Time => seed_rand_array_ref::<TimeArray>(size, seed, null_ratio),
DataType::Serial => seed_rand_array_ref::<SerialArray>(size, seed, null_ratio),
DataType::Timestamp => seed_rand_array_ref::<TimestampArray>(size, seed, null_ratio),
DataType::TimestampNanosecond => {
seed_rand_array_ref::<TimestampNanosecondArray>(size, seed, null_ratio)
DataType::TimestampNs => {
seed_rand_array_ref::<TimestampNsArray>(size, seed, null_ratio)
}
DataType::Timestamptz => {
seed_rand_array_ref::<TimestamptzArray>(size, seed, null_ratio)
Expand Down
Loading

0 comments on commit 8a89615

Please sign in to comment.