Skip to content

Commit

Permalink
Fixed many bugs. Improved stability and added features
Browse files Browse the repository at this point in the history
  • Loading branch information
coderReview committed Apr 2, 2024
1 parent 96b5f1c commit 2d8ccad
Show file tree
Hide file tree
Showing 22 changed files with 841 additions and 534 deletions.
18 changes: 10 additions & 8 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ npm-debug.log*
yarn-debug.log*
yarn-error.log*

node_modules/

# Runtime data
pids
*.pid
Expand All @@ -19,12 +17,16 @@ lib-cov
# Coverage directory used by tools like istanbul
coverage

# Compiled binary addons (https://nodejs.org/api/addons.html)
dist/
artifacts/
work/
ci/
e2e-results/
node_modules
dist
vendor
.tscache
coverage
.idea
.vscode

# OS generated files
.DS_Store

# Editor
.idea
Expand Down
2 changes: 1 addition & 1 deletion dist/module.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dist/module.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dist/plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
{"name": "Annotations Editor", "path": "img/annotations.png"}
],
"version": "5.0.0-alpha",
"updated": "2024-03-05"
"updated": "2024-04-02"
},
"dependencies": {
"grafanaDependency": ">=9.3.0",
Expand Down
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
"start": "yarn watch",
"test": "jest --watch --onlyChanged",
"test:ci": "jest --passWithNoTests --maxWorkers 4",
"typecheck": "tsc --noEmit"
"typecheck": "tsc --noEmit",
"build:backend": "mage -v build:linux && mage -v build:windows && mage -v build:darwin"
},
"author": "GridProtectionAlliance",
"license": "Apache-2.0",
"devDependencies": {
"@babel/core": "^7.21.4",
"@grafana/e2e": "10.3.3",
"@grafana/e2e-selectors": "10.3.3",
"@grafana/e2e": "latest",
"@grafana/e2e-selectors": "latest",
"@grafana/eslint-config": "^6.0.0",
"@grafana/tsconfig": "^1.2.0-rc1",
"@swc/core": "^1.3.90",
Expand Down Expand Up @@ -63,7 +64,7 @@
"@emotion/css": "11.10.6",
"@grafana/data": "latest",
"@grafana/runtime": "latest",
"@grafana/schema": "10.3.3",
"@grafana/schema": "latest",
"@grafana/ui": "latest",
"react": "18.2.0",
"react-dom": "18.2.0",
Expand Down
2 changes: 1 addition & 1 deletion pkg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func main() {
// ID). When datasource configuration changed Dispose method will be called and
// new datasource instance created using NewSampleDatasource factory.
if err := datasource.Manage("gridprotectionalliance-osisoftpi-datasource", plugin.NewPIWebAPIDatasource, datasource.ManageOpts{}); err != nil {
log.DefaultLogger.Error(err.Error())
log.DefaultLogger.Error("Manage", "Plugin", err.Error())
os.Exit(1)
}
}
4 changes: 2 additions & 2 deletions pkg/plugin/annotation_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
)

func (d Datasource) processAnnotationQuery(ctx context.Context, query backend.DataQuery) PiProcessedAnnotationQuery {
func (d *Datasource) processAnnotationQuery(ctx context.Context, query backend.DataQuery) PiProcessedAnnotationQuery {
var ProcessedQuery PiProcessedAnnotationQuery
var PiAnnotationQuery PIAnnotationQuery

Expand Down Expand Up @@ -73,7 +73,7 @@ func (d Datasource) processAnnotationQuery(ctx context.Context, query backend.Da
return ProcessedQuery
}

func (q *PiProcessedAnnotationQuery) getTimeRangeURIComponent() string {
func (q PiProcessedAnnotationQuery) getTimeRangeURIComponent() string {
return "&startTime=" + q.TimeRange.From.UTC().Format(time.RFC3339) + "&endTime=" + q.TimeRange.To.UTC().Format(time.RFC3339)
}

Expand Down
60 changes: 46 additions & 14 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewPIWebAPIDatasource(settings backend.DataSourceInstanceSettings) (instanc
var dataSourceOptions PIWebAPIDataSourceJsonData
err := json.Unmarshal(settings.JSONData, &dataSourceOptions)
if err != nil {
panic(err)
return nil, fmt.Errorf("http Unmarshal: %w", err)
}

opts, err := settings.HTTPClientOptions()
Expand All @@ -54,9 +54,9 @@ func NewPIWebAPIDatasource(settings backend.DataSourceInstanceSettings) (instanc

webIDCache := newWebIDCache()

// Create a new scheduler that will be used to clean the webIDCache every 5 minutes.
// Create a new scheduler that will be used to clean the webIDCache every 60 minutes.
scheduler := gocron.NewScheduler(time.UTC)
scheduler.Every(5).Minute().Do(cleanWebIDCache, webIDCache)
scheduler.Every(1).Hour().Do(cleanWebIDCache, webIDCache)
scheduler.StartAsync()

ds := &Datasource{
Expand All @@ -65,16 +65,22 @@ func NewPIWebAPIDatasource(settings backend.DataSourceInstanceSettings) (instanc
webIDCache: webIDCache,
scheduler: scheduler,
websocketConnectionsMutex: &sync.Mutex{},
sendersByWebIDMutex: &sync.Mutex{},
datasourceMutex: &sync.Mutex{},
channelConstruct: make(map[string]StreamChannelConstruct),
websocketConnections: make(map[string]*websocket.Conn),
sendersByWebID: make(map[string]map[*backend.StreamSender]bool),
streamChannels: make(map[string]chan []byte),
dataSourceOptions: &dataSourceOptions,
initalTime: time.Now(),
totalCalls: 0,
callRate: 0.0,
}

// Create a new query mux and assign it to the datasource.
ds.queryMux = ds.newQueryMux()

backend.Logger.Info("NewPIWebAPIDatasource Created")

return ds, nil
}

Expand All @@ -85,6 +91,34 @@ func (d *Datasource) Dispose() {
d.httpClient.CloseIdleConnections()
}

func (d *Datasource) updateRate() {
d.datasourceMutex.Lock()
/// show stats
modCall := d.totalCalls % 50
if modCall == 0 {
backend.Logger.Info("Processing QueryTSData End", "CallRate", d.callRate)
}

// update data
d.totalCalls += 1
d.callRate = float64(d.totalCalls) / float64(time.Now().Unix()-d.initalTime.Unix())

// backpressure
if d.callRate > 500 {
time.Sleep(time.Duration(d.callRate) * time.Millisecond)
backend.Logger.Info("Processing QueryTSData BackPressure", "CallRate", d.callRate)
}

// reset every 5 minutes
if time.Since(d.initalTime).Seconds() > 30 {
d.initalTime = time.Now()
d.totalCalls = 1
d.callRate = float64(d.totalCalls) / float64(time.Now().Unix()-d.initalTime.Unix())
backend.Logger.Info("Processing QueryTSData ResetTime", "CallRate", d.callRate)
}
d.datasourceMutex.Unlock()
}

// newQueryMux creates a new query mux used for routing queries to the correct handler.
func (d *Datasource) newQueryMux() *datasource.QueryTypeMux {
mux := datasource.NewQueryTypeMux()
Expand Down Expand Up @@ -112,23 +146,19 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques
// TODO: Missing functionality: Add Replace Bad Values
// QueryTSData is called by Grafana when a user executes a time series data query.
func (d *Datasource) QueryTSData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {

// //TODO: Remove this debug information
// jsonReq, err := json.Marshal(req)
// if err != nil {
// return nil, fmt.Errorf("error marshaling QueryDataRequest: %v", err)
// }

// backend.Logger.Info("QueryDataRequest: ", string(jsonReq))
// backend.Logger.Info("QueryDataRequest: ", "REQUEST", string(jsonReq))
// end remove this debug information

processedPIWebAPIQueries := make(map[string][]PiProcessedQuery)
datasourceUID := req.PluginContext.DataSourceInstanceSettings.UID

// Process queries generic query objects and turn them into a suitable format for the PI Web API
for _, q := range req.Queries {
backend.Logger.Info("Processing Query", "RefID", q.RefID)
processedPIWebAPIQueries[q.RefID] = d.processQuery(ctx, q, datasourceUID)
processedPIWebAPIQueries[q.RefID] = d.processQuery(q, datasourceUID)
}

// Send the queries to the PI Web API
Expand All @@ -137,6 +167,9 @@ func (d *Datasource) QueryTSData(ctx context.Context, req *backend.QueryDataRequ
// Convert the PI Web API response into Grafana frames
response := d.processBatchtoFrames(processedQueries_temp)

// Update rate and do backpressure
d.updateRate()

return response, nil
}

Expand All @@ -162,7 +195,7 @@ func (d *Datasource) QueryAnnotations(ctx context.Context, req *backend.QueryDat
),
)

backend.Logger.Info("Processing Annotation Query", "RefID", q.RefID)
// backend.Logger.Info("Processing Annotation Query", "RefID", q.RefID)
// Process the annotation query request, extracting only the useful information
ProcessedAnnotationQuery := d.processAnnotationQuery(ctx, q)
span.AddEvent("Completed processing annotation query request")
Expand All @@ -174,7 +207,7 @@ func (d *Datasource) QueryAnnotations(ctx context.Context, req *backend.QueryDat
if len(ProcessedAnnotationQuery.Attributes) > 0 {
attributeURLs, err := ProcessedAnnotationQuery.getEventFrameAttributeQueryURL()
if err != nil {
backend.Logger.Error("Error getting attribute URLs", "Error", err)
return nil, fmt.Errorf("error getting attribute URLs: %w", err)
}
batchReq = d.buildAnnotationBatch(url, attributeURLs...)
} else {
Expand All @@ -192,8 +225,7 @@ func (d *Datasource) QueryAnnotations(ctx context.Context, req *backend.QueryDat

annotationFrame, err := convertAnnotationResponseToFrame(ProcessedAnnotationQuery.RefID, r, ProcessedAnnotationQuery.AttributesEnabled)
if err != nil {
backend.Logger.Error("error converting response to frame: %w", err)
continue
return nil, fmt.Errorf("error converting response to frame: %w", err)
}

span.AddEvent("Converted response to Grafana frame")
Expand Down
6 changes: 5 additions & 1 deletion pkg/plugin/datasource_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package plugin
import (
"net/http"
"sync"
"time"

"github.com/go-co-op/gocron"
"github.com/gorilla/websocket"
Expand All @@ -17,13 +18,16 @@ type Datasource struct {
httpClient *http.Client
webIDCache WebIDCache
channelConstruct map[string]StreamChannelConstruct
datasourceMutex *sync.Mutex
scheduler *gocron.Scheduler
websocketConnectionsMutex *sync.Mutex
sendersByWebIDMutex *sync.Mutex
websocketConnections map[string]*websocket.Conn
sendersByWebID map[string]map[*backend.StreamSender]bool
streamChannels map[string]chan []byte
dataSourceOptions *PIWebAPIDataSourceJsonData
initalTime time.Time
totalCalls int
callRate float64
}

type PIWebAPIDataSourceJsonData struct {
Expand Down
Loading

0 comments on commit 2d8ccad

Please sign in to comment.