Skip to content

Commit

Permalink
[To rel/0.13] support create aligned timeseries (#51)
Browse files Browse the repository at this point in the history
Co-authored-by: fikers <fikersfan@gmail.com>
  • Loading branch information
HTHou and fikersd authored Sep 14, 2022
1 parent 9e78007 commit c8dadf6
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 0 deletions.
47 changes: 47 additions & 0 deletions client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,53 @@ func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding TS
return status, err
}

/*
*create single aligned time series
*params
*prefixPath: string, time series prefix path (starts from root)
*measurements: []string, sensor names
*dataTypes: []int32, data types for time series
*encodings: []int32, encodings for time series
*compressors: []int32, compressing types for time series
*measurementAlias: []string, sensor names alias
*return
*error: correctness of operation
*/
func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r *rpc.TSStatus, err error) {
destTypes := make([]int32, len(dataTypes))
for i, t := range dataTypes {
destTypes[i] = int32(t)
}

destEncodings := make([]int32, len(encodings))
for i, e := range encodings {
destEncodings[i] = int32(e)
}

destCompressions := make([]int32, len(compressors))
for i, e := range compressors {
destCompressions[i] = int32(e)
}

request := rpc.TSCreateAlignedTimeseriesReq{
SessionId: s.sessionId,
PrefixPath: prefixPath,
Measurements: measurements,
DataTypes: destTypes,
Encodings: destEncodings,
Compressors: destCompressions,
MeasurementAlias: measurementAlias,
}
status, err := s.client.CreateAlignedTimeseries(context.Background(), &request)
if err != nil && status == nil {
if s.reconnect() {
request.SessionId = s.sessionId
status, err = s.client.CreateAlignedTimeseries(context.Background(), &request)
}
}
return status, err
}

/*
*create multiple time series
*params
Expand Down
22 changes: 22 additions & 0 deletions example/session_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func main() {
createMultiTimeseries()
deleteTimeseries("root.sg1.dev1.temperature")

createAlignedTimeseries("root.sg1.dev1", []string{"status", "temperature"}, []string{"sts", "temp"})
deleteTimeseries("root.sg1.dev1.status")
deleteTimeseries("root.sg1.dev1.temperature")

insertStringRecord()
deleteTimeseries("root.ln.wf02.wt02.hardware")

Expand Down Expand Up @@ -291,6 +295,24 @@ func createTimeseries(path string) {
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}

func createAlignedTimeseries(prefixPath string, measurements, measurementAlias []string) {
var (
dataTypes = []client.TSDataType{
client.FLOAT,
client.FLOAT,
}
encodings = []client.TSEncoding{
client.PLAIN,
client.PLAIN,
}
compressors = []client.TSCompressionType{
client.LZ4,
client.LZ4,
}
)
checkError(session.CreateAlignedTimeseries(prefixPath, measurements, dataTypes, encodings, compressors, measurementAlias))
}

func createMultiTimeseries() {
var (
paths = []string{"root.sg1.dev1.temperature"}
Expand Down
34 changes: 34 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,40 @@ func (s *e2eTestSuite) Test_CreateTimeseries() {
assert.Equal(timeseries, "root.tsg1.dev1.status")
}

func (s *e2eTestSuite) Test_CreateAlignedTimeseries() {
var (
prefixPath = "root.tsg1.dev1"
measurements = []string{"status", "temperature"}
measurementAlias = []string{"sts", "temp"}
dataTypes = []client.TSDataType{
client.FLOAT,
client.FLOAT,
}
encodings = []client.TSEncoding{
client.PLAIN,
client.PLAIN,
}
compressors = []client.TSCompressionType{
client.LZ4,
client.LZ4,
}
)
s.checkError(s.session.CreateAlignedTimeseries(prefixPath, measurements, dataTypes, encodings, compressors, measurementAlias))
for i := range measurements {
fullPath := fmt.Sprintf("root.tsg1.dev1.%s", measurements[i])
ds, err := s.session.ExecuteQueryStatement(fmt.Sprintf("show timeseries %s", fullPath), nil)

assert := s.Require()

assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
var timeseries string
assert.NoError(ds.Scan(&timeseries))
assert.Equal(timeseries, fullPath)
}
}

func (s *e2eTestSuite) Test_InsertRecords() {
var (
deviceId = []string{"root.tsg1.dev1"}
Expand Down

0 comments on commit c8dadf6

Please sign in to comment.