From dd18bffa7d6f75a00ca03e843603ba5a6e786900 Mon Sep 17 00:00:00 2001 From: 0xterminator Date: Sun, 16 Feb 2025 12:05:10 +0200 Subject: [PATCH] feat(repo): Added api service initial skeleton --- Cargo.lock | 33 ++ Makefile | 23 + Tiltfile | 30 ++ .../fuel-streams/dashboards/api-metrics.json | 496 ++++++++++++++++++ .../templates/api/certificate.yaml | 68 +++ .../templates/api/deployment.yaml | 25 + cluster/charts/fuel-streams/values-local.yaml | 14 + cluster/charts/fuel-streams/values.yaml | 74 +++ cluster/docker/docker-compose.yml | 8 + cluster/docker/sv-api.Dockerfile | 75 +++ scripts/run_api.sh | 83 +++ services/api/Cargo.toml | 67 +++ services/api/README.md | 46 ++ services/api/src/cli.rs | 34 ++ services/api/src/config.rs | 58 ++ services/api/src/lib.rs | 23 + services/api/src/main.rs | 40 ++ services/api/src/metrics.rs | 349 ++++++++++++ services/api/src/server/errors.rs | 109 ++++ services/api/src/server/handlers/blocks.rs | 86 +++ services/api/src/server/handlers/mod.rs | 31 ++ services/api/src/server/mod.rs | 3 + services/api/src/server/state.rs | 80 +++ 23 files changed, 1855 insertions(+) create mode 100644 cluster/charts/fuel-streams/dashboards/api-metrics.json create mode 100644 cluster/charts/fuel-streams/templates/api/certificate.yaml create mode 100644 cluster/charts/fuel-streams/templates/api/deployment.yaml create mode 100644 cluster/docker/sv-api.Dockerfile create mode 100755 scripts/run_api.sh create mode 100644 services/api/Cargo.toml create mode 100644 services/api/README.md create mode 100644 services/api/src/cli.rs create mode 100644 services/api/src/config.rs create mode 100644 services/api/src/lib.rs create mode 100644 services/api/src/main.rs create mode 100644 services/api/src/metrics.rs create mode 100644 services/api/src/server/errors.rs create mode 100644 services/api/src/server/handlers/blocks.rs create mode 100644 services/api/src/server/handlers/mod.rs create mode 100644 services/api/src/server/mod.rs create mode 100644 services/api/src/server/state.rs diff --git a/Cargo.lock b/Cargo.lock index 71ef6fa5..6cb93ea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9218,6 +9218,39 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "734676eb262c623cec13c3155096e08d1f8f29adce39ba17948b18dad1e54142" +[[package]] +name = "sv-api" +version = "0.0.25" +dependencies = [ + "actix-web", + "actix-ws", + "anyhow", + "async-trait", + "clap 4.5.27", + "displaydoc", + "dotenvy", + "fuel-data-parser", + "fuel-streams-core", + "fuel-streams-domains", + "fuel-streams-store", + "fuel-tx 0.58.2", + "fuel-vm 0.58.2", + "fuel-web-utils", + "netlink-proto", + "num_cpus", + "openssl", + "prometheus", + "serde", + "serde_json", + "sqlx", + "thiserror 2.0.11", + "time", + "tokio", + "tracing", + "tracing-subscriber", + "validator", +] + [[package]] name = "sv-consumer" version = "0.0.25" diff --git a/Makefile b/Makefile index 6f71fc1a..91ea719d 100644 --- a/Makefile +++ b/Makefile @@ -275,6 +275,29 @@ run-webserver-testnet-dev: run-webserver-testnet-profiling: $(MAKE) run-webserver NETWORK=testnet MODE=profiling +# ------------------------------------------------------------ +# Api Run Commands +# ------------------------------------------------------------ + +run-api: NETWORK="testnet" +run-api: MODE="dev" +run-api: PORT="9004" +run-api: EXTRA_ARGS="" +run-api: check-network + @./scripts/run_api.sh --mode $(MODE) --port $(PORT) --extra-args $(EXTRA_ARGS) + +run-api-mainnet-dev: + $(MAKE) run-api NETWORK=mainnet MODE=dev + +run-api-mainnet-profiling: + $(MAKE) run-api NETWORK=mainnet MODE=profiling + +run-api-testnet-dev: + $(MAKE) run-api NETWORK=testnet MODE=dev + +run-api-testnet-profiling: + $(MAKE) run-api NETWORK=testnet MODE=profiling + # ------------------------------------------------------------ # Docker Compose # ------------------------------------------------------------ diff --git a/Tiltfile b/Tiltfile index ebdeae46..9b19231a 100755 --- a/Tiltfile +++ b/Tiltfile @@ -80,6 +80,29 @@ custom_build( ignore=['./target'] ) +# api server +custom_build( + ref='sv-api:latest', + image_deps=['sv-consumer:latest', 'sv-publisher:latest'], + command=[ + './cluster/scripts/build_docker.sh', + '--dockerfile', './cluster/docker/sv-api.Dockerfile' + ], + deps=[ + './src', + './Cargo.toml', + './Cargo.lock', + './cluster/docker/sv-api.Dockerfile' + ], + live_update=[ + sync('./src', '/usr/src'), + sync('./Cargo.toml', '/usr/src/Cargo.toml'), + sync('./Cargo.lock', '/usr/src/Cargo.lock'), + run('cargo build', trigger=['./src', './Cargo.toml', './Cargo.lock']) + ], + ignore=['./target'] +) + # Deploy the Helm chart with values from .env # Get deployment mode from environment variable, default to 'full' config_mode = os.getenv('CLUSTER_MODE', 'full') @@ -107,6 +130,13 @@ RESOURCES = { 'config_mode': ['minimal', 'full'], 'deps': ['fuel-streams-nats'] }, + 'sv-api': { + 'name': 'fuel-streams-sv-api', + 'ports': ['9004:9004'], + 'labels': 'api', + 'config_mode': ['minimal', 'full'], + 'deps': ['fuel-streams-sv-publisher', 'fuel-streams-sv-consumer'] + }, 'nats': { 'name': 'fuel-streams-nats', 'ports': ['4222:4222', '6222:6222', '7422:7422'], diff --git a/cluster/charts/fuel-streams/dashboards/api-metrics.json b/cluster/charts/fuel-streams/dashboards/api-metrics.json new file mode 100644 index 00000000..a3fc0c67 --- /dev/null +++ b/cluster/charts/fuel-streams/dashboards/api-metrics.json @@ -0,0 +1,496 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "links": [ + { + "asDropdown": true, + "icon": "external-link-alt", + "includeVars": true, + "keepTime": true, + "tags": [ + "fuel-streams" + ], + "targetBlank": false, + "title": "Related Dashboards", + "type": "dashboards" + } + ], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "DS__PROMETHEUS" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 35 + }, + "id": 16, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "editorMode": "code", + "expr": "ws_streamer_metrics_total_subscriptions", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Ws Total Subs", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "DS__PROMETHEUS" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "caleb_similique" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 35 + }, + "id": 14, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "editorMode": "code", + "expr": "avg(ws_user_active_subscriptions) by (user_name)", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Ws Average active subscriptions per user", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "DS__PROMETHEUS" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "{__name__=\"ws_streamer_metrics_total_subscriptions\", instance=\"192.168.1.143:9003\", job=\"sv-webserver\"}" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": false, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 43 + }, + "id": 13, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "editorMode": "code", + "expr": "ws_streamer_metrics_total_subscriptions", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Ws Active connections", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "DS__PROMETHEUS" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 43 + }, + "id": 15, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.3.1", + "targets": [ + { + "editorMode": "code", + "expr": "rate(ws_connection_duration_seconds_sum[5m]) / rate(ws_connection_duration_seconds_count[5m])", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Average connection duration", + "type": "timeseries" + } + ], + "refresh": "10s", + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h" + ], + "time_options": [ + "5m", + "15m", + "1h", + "6h", + "12h", + "24h", + "2d", + "7d", + "30d" + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "time": { + "from": "now-6h", + "to": "now" + }, + "tags": [ + "fuel-streams", + "webserver" + ], + "timezone": "browser", + "title": "Fuel Streams / Webserver Metrics", + "uid": "webserver-metrics", + "version": 1, + "weekStart": "", + "schemaVersion": 36, + "style": "dark", + "liveNow": false +} diff --git a/cluster/charts/fuel-streams/templates/api/certificate.yaml b/cluster/charts/fuel-streams/templates/api/certificate.yaml new file mode 100644 index 00000000..5edd1617 --- /dev/null +++ b/cluster/charts/fuel-streams/templates/api/certificate.yaml @@ -0,0 +1,68 @@ +{{- $api := .Values.api }} +{{- $service := $api.service }} +{{- $tls := $api.tls }} +{{- $certificate := $tls.certificate }} +{{- $ingress := $tls.ingress }} +{{- $component := "api" -}} +{{- $secretName := printf "%s-api-tls" (include "fuel-streams.fullname" .) -}} +{{- $serviceName := printf "%s-api" (include "fuel-streams.fullname" .) -}} +{{- $certificateDict := dict "root" . "context" $certificate "name" "api-cert" "component" $component -}} +{{- $ingressDict := dict "root" . "context" $ingress "name" "api-cert-validator" "component" $component -}} +{{- if and $api.enabled $service.host }} +{{- if $tls.enabled }} +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + {{- include "k8s.resource-metadata" $certificateDict | nindent 2 }} +spec: + secretName: {{ $secretName }} + duration: {{ $certificate.duration }} + renewBefore: {{ $certificate.renewBefore }} + dnsNames: + - {{ $service.host }} + issuerRef: + name: {{ $certificate.issuer }} + kind: ClusterIssuer +{{- end }} +--- +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + {{ include "k8s.resource-metadata.default" $ingressDict | nindent 2 }} + {{ include "k8s.resource-metadata.labels" $ingressDict | nindent 2 }} + annotations: + kubernetes.io/ingress.class: nginx + nginx.ingress.kubernetes.io/proxy-body-size: "0" + nginx.ingress.kubernetes.io/backend-protocol: "HTTP" + nginx.ingress.kubernetes.io/websocket-services: {{ $serviceName }} + external-dns.alpha.kubernetes.io/hostname: {{ $service.host }} + external-dns.alpha.kubernetes.io/cloudflare-proxied: "false" + {{- if $tls.enabled }} + ingress.kubernetes.io/ssl-redirect: "true" + nginx.ingress.kubernetes.io/ssl-redirect: "true" + acme.cert-manager.io/http01-ingress-class: nginx + cert-manager.io/common-name: {{ $service.host }} + cert-manager.io/cluster-issuer: {{ $certificate.issuer }} + {{- end }} + {{- include "k8s.resource-metadata.annotations-raw" $ingressDict | nindent 4 }} +spec: + ingressClassName: nginx + {{- if $tls.enabled }} + tls: + - hosts: + - {{ $service.host }} + secretName: {{ $secretName }} + {{- end }} + rules: + - host: {{ $service.host }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ $serviceName }} + port: + number: {{ $service.port }} +{{- end }} diff --git a/cluster/charts/fuel-streams/templates/api/deployment.yaml b/cluster/charts/fuel-streams/templates/api/deployment.yaml new file mode 100644 index 00000000..6c703932 --- /dev/null +++ b/cluster/charts/fuel-streams/templates/api/deployment.yaml @@ -0,0 +1,25 @@ +{{- $api := .Values.api -}} +{{- $tls := $api.tls -}} +{{- $name := "api" -}} +{{- $component := "api" -}} +{{- $serviceDict := dict "root" . "context" $api "name" $name "component" $component -}} +{{- if $api.enabled -}} +apiVersion: apps/v1 +kind: Deployment +metadata: + {{- include "k8s.resource-metadata" $serviceDict | nindent 2 }} +spec: + {{- include "k8s.pod-spec" $serviceDict | nindent 2 }} + template: + {{- include "k8s.template-metadata" $serviceDict | nindent 4 }} + spec: + {{- include "k8s.pod-config" $serviceDict | nindent 6 }} + containers: + - name: api + {{- include "k8s.container-config.image" $serviceDict | nindent 10 }} + {{- include "k8s.container-config" $serviceDict | nindent 10 }} +{{ include "k8s.hpa" $serviceDict }} +{{ include "k8s.service" $serviceDict }} +{{ include "k8s.service-monitor" $serviceDict }} +{{ include "k8s.grafana-dashboard" $serviceDict }} +{{- end }} diff --git a/cluster/charts/fuel-streams/values-local.yaml b/cluster/charts/fuel-streams/values-local.yaml index e7f6acb6..b07caaf8 100644 --- a/cluster/charts/fuel-streams/values-local.yaml +++ b/cluster/charts/fuel-streams/values-local.yaml @@ -60,6 +60,20 @@ webserver: tls: enabled: false +api: + enabled: false + image: + repository: sv-api + pullPolicy: IfNotPresent + tag: latest + + service: + enabled: true + port: 9004 + + tls: + enabled: false + # NATS Core configuration for local development nats: enabled: false diff --git a/cluster/charts/fuel-streams/values.yaml b/cluster/charts/fuel-streams/values.yaml index 7699bf4c..115f5703 100755 --- a/cluster/charts/fuel-streams/values.yaml +++ b/cluster/charts/fuel-streams/values.yaml @@ -332,6 +332,80 @@ webserver: autoscaling: enabled: true + +# ------------------------------------------------------------------------------ +# Api configuration +# ------------------------------------------------------------------------------ + +api: + enabled: true + network: mainnet + + port: 8080 + ports: [] + + image: + repository: ghcr.io/fuellabs/sv-api + pullPolicy: Always + tag: latest + + service: + enabled: true + port: 8080 + type: ClusterIP + host: "stream-staging.fuel.network" + loadBalancerClass: service.k8s.aws/nlb + config: + annotations: {} + labels: {} + + prometheus: + enabled: true + scrape: true + path: /api/v1/metrics + config: + annotations: {} + labels: + release: kube-prometheus-stack + + tls: + enabled: true + certificate: + issuer: letsencrypt-prod + duration: 2160h + renewBefore: 360h + config: + annotations: {} + labels: {} + ingress: + config: + annotations: {} + labels: {} + + # You can override the env variables for the container here + # using a map or an array of key-value pairs + envFrom: [] + env: {} + + config: + replicaCount: 3 + labels: {} + annotations: {} + podAnnotations: {} + nodeSelector: {} + tolerations: [] + affinity: {} + imagePullSecrets: [] + livenessProbe: {} + readinessProbe: {} + startupProbe: {} + podSecurityContext: {} + containerSecurityContext: {} + resources: {} + + autoscaling: + enabled: true + # ------------------------------------------------------------------------------ # NATS Core configuration # ------------------------------------------------------------------------------ diff --git a/cluster/docker/docker-compose.yml b/cluster/docker/docker-compose.yml index f0ee243a..6b8474a2 100644 --- a/cluster/docker/docker-compose.yml +++ b/cluster/docker/docker-compose.yml @@ -50,6 +50,14 @@ services: timeout: 5s retries: 5 + adminer: + image: adminer:latest + container_name: adminer + depends_on: + - postgres + ports: + - 8085:8080 + prometheus: profiles: - all diff --git a/cluster/docker/sv-api.Dockerfile b/cluster/docker/sv-api.Dockerfile new file mode 100644 index 00000000..a877a807 --- /dev/null +++ b/cluster/docker/sv-api.Dockerfile @@ -0,0 +1,75 @@ +# Stage 1: Build +FROM --platform=$BUILDPLATFORM tonistiigi/xx AS xx +FROM --platform=$BUILDPLATFORM rust:1.84.0 AS chef + +ARG TARGETPLATFORM +RUN cargo install cargo-chef && rustup target add wasm32-unknown-unknown +WORKDIR /build/ + +COPY --from=xx / / + +# hadolint ignore=DL3008 +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + lld \ + clang \ + libclang-dev \ + && xx-apt-get update \ + && xx-apt-get install -y libc6-dev g++ binutils \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + + +FROM chef AS planner +ENV CARGO_NET_GIT_FETCH_WITH_CLI=true +COPY . . +RUN cargo chef prepare --recipe-path recipe.json + + +FROM chef AS builder +ARG DEBUG_SYMBOLS=false +ENV CARGO_NET_GIT_FETCH_WITH_CLI=true +ENV CARGO_PROFILE_RELEASE_DEBUG=$DEBUG_SYMBOLS +COPY --from=planner /build/recipe.json recipe.json +RUN echo $CARGO_PROFILE_RELEASE_DEBUG +# Build our project dependencies, not our application! +RUN \ + --mount=type=cache,target=/usr/local/cargo/registry/index \ + --mount=type=cache,target=/usr/local/cargo/registry/cache \ + --mount=type=cache,target=/usr/local/cargo/git/db \ + --mount=type=cache,target=/build/target \ + xx-cargo chef cook --release --no-default-features -p sv-api --recipe-path recipe.json +# Up to this point, if our dependency tree stays the same, +# all layers should be cached. +COPY . . +# build application +RUN \ + --mount=type=cache,target=/usr/local/cargo/registry/index \ + --mount=type=cache,target=/usr/local/cargo/registry/cache \ + --mount=type=cache,target=/usr/local/cargo/git/db \ + --mount=type=cache,target=/build/target \ + xx-cargo build --release --no-default-features -p sv-api \ + && xx-verify ./target/$(xx-cargo --print-target-triple)/release/sv-api \ + && cp ./target/$(xx-cargo --print-target-triple)/release/sv-api /root/sv-api \ + && cp ./target/$(xx-cargo --print-target-triple)/release/sv-api.d /root/sv-api.d + +# Stage 2: Run +FROM ubuntu:22.04 AS run + +ARG PORT=9003 +ENV PORT=$PORT + +WORKDIR /usr/src + +RUN apt-get update -y \ + && apt-get install -y --no-install-recommends ca-certificates curl \ + # Clean up + && apt-get autoremove -y \ + && apt-get clean -y \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /root/sv-api . +COPY --from=builder /root/sv-api.d . + +EXPOSE ${PORT} +CMD ["./sv-api"] diff --git a/scripts/run_api.sh b/scripts/run_api.sh new file mode 100755 index 00000000..69bbdcc0 --- /dev/null +++ b/scripts/run_api.sh @@ -0,0 +1,83 @@ +#!/bin/bash + +# Exit immediately if a command exits with a non-zero status +set -e + +# Load environment variables with defaults +PORT=${PORT:-9004} +MODE=${MODE:-dev} +EXTRA_ARGS=${EXTRA_ARGS:-""} + +# ------------------------------ +# Function to Display Usage +# ------------------------------ +usage() { + echo "Usage: $0 [options]" + echo "Options:" + echo " --mode : Specify the run mode (dev|profiling)" + echo " --port : Port number for the API server (default: 9004)" + echo " --extra-args : Optional additional arguments to append (in quotes)" + echo "" + echo "Examples:" + echo " $0 # Runs with all defaults" + echo " $0 --mode dev --port 8080 # Custom port" + echo " $0 --mode dev --extra-args '\"--use-metrics\"' # Enable metrics" + exit 1 +} + +while [[ "$#" -gt 0 ]]; do + case $1 in + --mode) + MODE="$2" + shift 2 + ;; + --port) + PORT="$2" + shift 2 + ;; + --extra-args) + EXTRA_ARGS="$2" + shift 2 + ;; + --help) + usage + ;; + *) + echo "Error: Unknown parameter passed: $1" >&2 + usage + ;; + esac +done + +# ------------------------------ +# Load Environment +# ------------------------------ +source ./scripts/set_env.sh + +# Print the configuration being used +echo -e "\n==========================================" +echo "⚙️ Configuration" +echo -e "==========================================" + +# Runtime Configuration +echo "Runtime Settings:" +echo "→ Mode: ${MODE:-dev}" +echo "→ API Port: ${PORT:-9004}" +if [ -n "$EXTRA_ARGS" ]; then + echo "→ Extra Arguments: $EXTRA_ARGS" +fi + +echo -e "==========================================\n" + +# Define common arguments +COMMON_ARGS=( + "--port" "${PORT:-9003}" +) + +# Execute based on mode +if [ "${MODE:-dev}" == "dev" ]; then + cargo run -p sv-api -- "${COMMON_ARGS[@]}" ${EXTRA_ARGS} +else + cargo build --profile profiling --package sv-api + samply record ./target/profiling/sv-api "${COMMON_ARGS[@]}" ${EXTRA_ARGS} +fi diff --git a/services/api/Cargo.toml b/services/api/Cargo.toml new file mode 100644 index 00000000..9953c416 --- /dev/null +++ b/services/api/Cargo.toml @@ -0,0 +1,67 @@ +[package] +name = "sv-api" +description = "Fuel library for retrieving data from a fuel indexed database" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +keywords.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true +publish = false + +[[bin]] +name = "sv-api" +path = "src/main.rs" + +[dependencies] +actix-web.workspace = true +actix-ws = "0.3.0" +anyhow.workspace = true +async-trait.workspace = true +clap.workspace = true +displaydoc.workspace = true +dotenvy.workspace = true +fuel-data-parser.workspace = true +fuel-streams-core.workspace = true +fuel-streams-domains.workspace = true +fuel-streams-store.workspace = true +fuel-tx.workspace = true +fuel-vm.workspace = true +fuel-web-utils.workspace = true +num_cpus.workspace = true +prometheus = { version = "0.13", features = ["process"] } +serde.workspace = true +serde_json.workspace = true +sqlx = { workspace = true, default-features = false, features = [ + "any", + "macros", + "postgres", + "runtime-tokio", + "tls-native-tls", +] } +thiserror = "2.0" +time = { version = "0.3", features = ["serde"] } +tokio.workspace = true +tracing.workspace = true +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +validator = { version = "0.19", features = ["derive"] } + +# these dependencies need to update in the future when fuel-core 0.41.4 is on mainnet +[target.'cfg(target_os = "linux")'.dependencies] +netlink-proto = { version = "=0.11.3", optional = true } + +# in an individual package Cargo.toml +[package.metadata.cargo-machete] +ignored = ["fuel-data-parser"] + +[features] +default = [] +test-helpers = ["fuel-data-parser/test-helpers"] + +[target.x86_64-unknown-linux-gnu.dependencies] +openssl = { version = "0.10.68", features = ["vendored"] } + +[target.x86_64-unknown-linux-musl.dependencies] +openssl = { version = "0.10.68", features = ["vendored"] } diff --git a/services/api/README.md b/services/api/README.md new file mode 100644 index 00000000..41c619ff --- /dev/null +++ b/services/api/README.md @@ -0,0 +1,46 @@ +
+
+ + Logo + +

Fuel Streams Api

+

+ A binary webserver with a rest api for retrieving fuel-specific data from an indexed database +

+

+ + CI + + + Coverage + +

+

+ 📚 Documentation +   + 🐛 Report Bug +   + ✨ Request Feature +

+
+ +## 📝 About The Project + +A binary webserver with a rest api for retrieving fuel-specific data from an indexed database. + +## ⚡️ Getting Started + +### Prerequisites + +- [Rust toolchain](https://www.rust-lang.org/tools/install) +- [Docker](https://www.docker.com/get-started/) (optional) + +## 🤝 Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. + +For more information on contributing, please see the [CONTRIBUTING.md](../../CONTRIBUTING.md) file in the root of the repository. + +## 📜 License + +This project is licensed under the `Apache-2.0` license. See [`LICENSE`](../../LICENSE) for more information. diff --git a/services/api/src/cli.rs b/services/api/src/cli.rs new file mode 100644 index 00000000..d9a0853d --- /dev/null +++ b/services/api/src/cli.rs @@ -0,0 +1,34 @@ +use clap::Parser; + +/// CLI structure for parsing command-line arguments. +#[derive(Clone, Parser)] +pub struct Cli { + /// API port number + #[arg( + long, + value_name = "PORT", + env = "PORT", + default_value = "9003", + help = "Port number for the API server" + )] + pub port: u16, + + /// Database URL to connect to. + #[arg( + long, + value_name = "DATABASE_URL", + env = "DATABASE_URL", + default_value = "postgresql://root@localhost:26257/defaultdb?sslmode=disable", + help = "Database URL to connect to." + )] + pub db_url: String, + + /// Use metrics + #[arg( + long, + env = "USE_METRICS", + default_value = "false", + help = "Enable metrics" + )] + pub use_metrics: bool, +} diff --git a/services/api/src/config.rs b/services/api/src/config.rs new file mode 100644 index 00000000..9b431ca2 --- /dev/null +++ b/services/api/src/config.rs @@ -0,0 +1,58 @@ +use std::path::PathBuf; + +use clap::Parser; +use displaydoc::Display as DisplayDoc; +use thiserror::Error; + +#[derive(Debug, DisplayDoc, Error)] +pub enum Error { + /// Undecodable config element: {0} + UndecodableConfigElement(&'static str), +} + +#[derive(Debug, Default, Clone)] +pub struct S3Config { + pub enabled: bool, +} + +#[derive(Clone, Debug)] +pub struct TlsConfig { + pub private_key: PathBuf, + pub certificate: PathBuf, +} + +#[derive(Clone, Debug)] +pub struct ApiConfig { + pub port: u16, + pub tls: Option, +} + +#[derive(Clone, Debug)] +pub struct DbConfig { + pub url: String, +} + +#[derive(Clone, Debug)] +pub struct Config { + pub api: ApiConfig, + pub db: DbConfig, +} + +impl Config { + pub fn load() -> Result { + let cli = crate::cli::Cli::parse(); + Self::from_cli(&cli) + } + + fn from_cli(cli: &crate::cli::Cli) -> Result { + Ok(Config { + api: ApiConfig { + port: cli.port, + tls: None, + }, + db: DbConfig { + url: cli.db_url.clone(), + }, + }) + } +} diff --git a/services/api/src/lib.rs b/services/api/src/lib.rs new file mode 100644 index 00000000..3561bbb7 --- /dev/null +++ b/services/api/src/lib.rs @@ -0,0 +1,23 @@ +pub mod cli; +pub mod config; +pub mod metrics; +pub mod server; + +use std::sync::LazyLock; + +pub static STREAMER_MAX_WORKERS: LazyLock = LazyLock::new(|| { + let available_cpus = num_cpus::get(); + let default_threads = 2 * available_cpus; + + dotenvy::var("STREAMER_MAX_WORKERS") + .ok() + .and_then(|val| val.parse().ok()) + .unwrap_or(default_threads) +}); + +pub static API_KEY_MAX_CONN_LIMIT: LazyLock> = + LazyLock::new(|| { + dotenvy::var("API_KEY_MAX_CONN_LIMIT") + .ok() + .and_then(|val| val.parse::().ok()) + }); diff --git a/services/api/src/main.rs b/services/api/src/main.rs new file mode 100644 index 00000000..1e14edba --- /dev/null +++ b/services/api/src/main.rs @@ -0,0 +1,40 @@ +use fuel_web_utils::server::api::{spawn_web_server, ApiServerBuilder}; +use sv_api::{ + config::Config, + server::{handlers, state::ServerState}, +}; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // init tracing + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .with_span_events(FmtSpan::CLOSE) + .init(); + + if let Err(err) = dotenvy::dotenv() { + tracing::warn!("File .env not found: {:?}", err); + } + + let config = Config::load()?; + let server_state = ServerState::new(&config).await?; + let server = ApiServerBuilder::new(config.api.port, server_state.clone()) + .with_dynamic_routes(handlers::create_services(server_state)) + .build()?; + let server_handle = server.handle(); + let server_task = spawn_web_server(server).await; + let _ = tokio::join!(server_task); + + // Await the Actix server shutdown + tracing::info!("Stopping actix server ..."); + server_handle.stop(true).await; + tracing::info!("Actix server stopped. Goodbye!"); + + Ok(()) +} diff --git a/services/api/src/metrics.rs b/services/api/src/metrics.rs new file mode 100644 index 00000000..0def0504 --- /dev/null +++ b/services/api/src/metrics.rs @@ -0,0 +1,349 @@ +use std::time::Duration; + +use async_trait::async_trait; +use fuel_streams_core::server::Subscription; +use fuel_web_utils::{ + server::middlewares::api_key::ApiKeyUserId, + telemetry::metrics::TelemetryMetrics, +}; +use prometheus::{ + register_histogram_vec, + register_int_counter_vec, + register_int_gauge_vec, + HistogramVec, + IntCounterVec, + IntGaugeVec, + Registry, +}; + +#[derive(Debug)] +pub enum SubscriptionChange { + Added, + Removed, +} + +#[derive(Clone, Debug)] +pub struct Metrics { + pub registry: Registry, + pub total_ws_subs: IntGaugeVec, + pub user_subscribed_messages: IntGaugeVec, + pub subs_messages_throughput: IntCounterVec, + pub subs_messages_error_rates: IntCounterVec, + pub connection_duration: HistogramVec, + pub user_active_subscriptions: IntGaugeVec, + pub subscription_lifetime: HistogramVec, +} + +impl Default for Metrics { + fn default() -> Self { + Metrics::new(None).expect("Failed to create default Metrics") + } +} + +#[async_trait] +impl TelemetryMetrics for Metrics { + fn registry(&self) -> &Registry { + &self.registry + } + + fn metrics(&self) -> Option { + Some(self.clone()) + } +} + +impl Metrics { + pub fn new_with_random_prefix() -> anyhow::Result { + Metrics::new(Some(Metrics::generate_random_prefix())) + } + + pub fn new(prefix: Option) -> anyhow::Result { + let metric_prefix = prefix + .clone() + .map(|p| format!("{}_", p)) + .unwrap_or_default(); + + let total_ws_subs = register_int_gauge_vec!( + format!("{}ws_streamer_metrics_total_subscriptions", metric_prefix), + "A metric counting the number of active ws subscriptions", + &[], + ) + .expect("metric must be created"); + + let user_subscribed_messages = register_int_gauge_vec!( + format!( + "{}ws_streamer_metrics_user_subscribed_messages", + metric_prefix + ), + "A metric counting the number of published messages", + &["user_id", "user_name", "subject"], + ) + .expect("metric must be created"); + + let subs_messages_throughput = register_int_counter_vec!( + format!( + "{}ws_streamer_metrics_subs_messages_throughput", + metric_prefix + ), + "A metric counting the number of subscription messages per subject", + &["subject"], + ) + .expect("metric must be created"); + + let subs_messages_error_rates = + register_int_counter_vec!( + format!("{}ws_streamer_metrics_subs_messages_error_rates", metric_prefix), + "A metric counting errors or failures during subscription message processing", + &["subject", "error_type"], + ) + .expect("metric must be created"); + + let connection_duration = register_histogram_vec!( + format!("{}ws_connection_duration_seconds", metric_prefix), + "Duration of WebSocket connections in seconds", + &["user_id", "user_name"], + vec![0.1, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0, 1800.0, 3600.0] + ) + .expect("metric must be created"); + + let user_active_subscriptions = register_int_gauge_vec!( + format!("{}ws_user_active_subscriptions", metric_prefix), + "Number of active subscriptions per user", + &["user_id", "user_name"] + ) + .expect("metric must be created"); + + let subscription_lifetime = register_histogram_vec!( + format!("{}ws_subscription_lifetime_seconds", metric_prefix), + "Duration of individual subscriptions in seconds", + &["user_id", "user_name", "subscription_id"], + vec![0.1, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0, 1800.0, 3600.0] + ) + .expect("metric must be created"); + + let registry = + Registry::new_custom(prefix, None).expect("registry to be created"); + registry.register(Box::new(total_ws_subs.clone()))?; + registry.register(Box::new(user_subscribed_messages.clone()))?; + registry.register(Box::new(subs_messages_throughput.clone()))?; + registry.register(Box::new(subs_messages_error_rates.clone()))?; + registry.register(Box::new(connection_duration.clone()))?; + registry.register(Box::new(user_active_subscriptions.clone()))?; + registry.register(Box::new(subscription_lifetime.clone()))?; + + Ok(Self { + registry, + total_ws_subs, + user_subscribed_messages, + subs_messages_throughput, + subs_messages_error_rates, + connection_duration, + user_active_subscriptions, + subscription_lifetime, + }) + } + + pub fn update_user_subscription_metrics( + &self, + user_id: ApiKeyUserId, + user_name: &str, + subject: &str, + ) { + self.user_subscribed_messages + .with_label_values(&[ + user_id.to_string().as_str(), + user_name, + subject, + ]) + .inc(); + + // Increment throughput for the subscribed messages + self.subs_messages_throughput + .with_label_values(&[subject]) + .inc(); + } + + pub fn update_error_metrics(&self, subject: &str, error_type: &str) { + self.subs_messages_error_rates + .with_label_values(&[subject, error_type]) + .inc(); + } + + pub fn increment_subscriptions_count(&self) { + self.total_ws_subs.with_label_values(&[]).inc(); + } + + pub fn decrement_subscriptions_count(&self) { + self.total_ws_subs.with_label_values(&[]).dec(); + } + + pub fn update_unsubscribed( + &self, + user_id: ApiKeyUserId, + user_name: &str, + subject: &str, + ) { + self.user_subscribed_messages + .with_label_values(&[&user_id.to_string(), user_name, subject]) + .dec(); + } + + pub fn update_subscribed( + &self, + user_id: ApiKeyUserId, + user_name: &str, + subject: &str, + ) { + self.user_subscribed_messages + .with_label_values(&[&user_id.to_string(), user_name, subject]) + .inc(); + } + + pub fn track_connection_duration( + &self, + user_id: ApiKeyUserId, + user_name: &str, + duration: Duration, + ) { + self.connection_duration + .with_label_values(&[&user_id.to_string(), user_name]) + .observe(duration.as_secs_f64()); + } + + pub fn update_user_subscription_count( + &self, + user_id: ApiKeyUserId, + user_name: &str, + subject: &str, + change: &SubscriptionChange, + ) { + let delta = match change { + SubscriptionChange::Added => 1, + SubscriptionChange::Removed => -1, + }; + + // Update per-user subscription count + self.user_active_subscriptions + .with_label_values(&[&user_id.to_string(), user_name]) + .add(delta); + + // Update subject-specific count + self.user_subscribed_messages + .with_label_values(&[&user_id.to_string(), user_name, subject]) + .add(delta); + } + + pub fn track_subscription_lifetime( + &self, + user_id: ApiKeyUserId, + user_name: &str, + subscription_id: &Subscription, + duration: Duration, + ) { + self.subscription_lifetime + .with_label_values(&[ + &user_id.to_string(), + user_name, + &subscription_id.to_string(), + ]) + .observe(duration.as_secs_f64()); + } +} + +#[cfg(test)] +mod tests { + use prometheus::{gather, Encoder, TextEncoder}; + + use super::*; + + impl Metrics { + pub fn random() -> Self { + Metrics::new_with_random_prefix() + .expect("Failed to create random Metrics") + } + } + + #[test] + fn test_user_subscribed_messages_metric() { + let metrics = Metrics::random(); + + metrics + .user_subscribed_messages + .with_label_values(&["user_id_1", "user_name_1", "subject_1"]) + .set(5); + + let metric_families = gather(); + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + let output = String::from_utf8(buffer.clone()).unwrap(); + + assert!(output.contains("ws_streamer_metrics_user_subscribed_messages")); + assert!(output.contains("user_id_1")); + assert!(output.contains("user_name_1")); + assert!(output.contains("subject_1")); + assert!(output.contains("5")); + } + + #[test] + fn test_subs_messages_total_metric() { + let metrics = Metrics::random(); + + metrics.total_ws_subs.with_label_values(&[]).set(10); + + let metric_families = gather(); + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + let output = String::from_utf8(buffer.clone()).unwrap(); + + assert!(output.contains("ws_streamer_metrics_total_subscriptions")); + assert!(output.contains("10")); + } + + #[test] + fn test_subs_messages_throughput_metric() { + let metrics = Metrics::random(); + + metrics + .subs_messages_throughput + .with_label_values(&["subject_1"]) + .inc_by(10); + + let metric_families = gather(); + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + let output = String::from_utf8(buffer.clone()).unwrap(); + + assert!(output.contains("ws_streamer_metrics_subs_messages_throughput")); + assert!(output.contains("subject_1")); + assert!(output.contains("10")); + } + + #[test] + fn test_subs_messages_error_rates_metric() { + let metrics = Metrics::random(); + + metrics + .subs_messages_error_rates + .with_label_values(&["subject_1", "timeout"]) + .inc_by(1); + + let metric_families = gather(); + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + let output = String::from_utf8(buffer.clone()).unwrap(); + + assert!( + output.contains("ws_streamer_metrics_subs_messages_error_rates") + ); + assert!(output.contains("subject_1")); + assert!(output.contains("timeout")); + assert!(output.contains("1")); + } +} diff --git a/services/api/src/server/errors.rs b/services/api/src/server/errors.rs new file mode 100644 index 00000000..f0e09c63 --- /dev/null +++ b/services/api/src/server/errors.rs @@ -0,0 +1,109 @@ +use actix_ws::{CloseCode, CloseReason, Closed, ProtocolError}; +use fuel_streams_core::{ + prelude::SubjectPayloadError, + stream::StreamError, + types::{MessagePayloadError, ServerRequestError}, +}; +use fuel_streams_domains::SubjectsError; +use fuel_streams_store::{ + db::DbError, + record::{EncoderError, RecordEntityError}, + store::StoreError, +}; +use tokio::task::JoinError; + +/// Ws Subscription-related errors +#[derive(Debug, thiserror::Error)] +pub enum WebsocketError { + #[error("Connection closed with reason: {code} - {description}")] + ClosedWithReason { code: u16, description: String }, + #[error("Connection closed")] + Closed(#[from] Closed), + #[error("Unsupported message type")] + UnsupportedMessageType, + #[error(transparent)] + ProtocolError(#[from] ProtocolError), + #[error("Failed to send message")] + SendError, + #[error("Client timeout")] + Timeout, + #[error("Subscribe failed: {0}")] + Subscribe(String), + #[error("Unsubscribe failed: {0}")] + Unsubscribe(String), + + #[error(transparent)] + JoinHandle(#[from] JoinError), + #[error(transparent)] + ServerRequest(#[from] ServerRequestError), + #[error(transparent)] + StreamError(#[from] StreamError), + #[error(transparent)] + Serde(#[from] serde_json::Error), + #[error(transparent)] + Encoder(#[from] EncoderError), + #[error(transparent)] + Database(#[from] DbError), + #[error(transparent)] + Store(#[from] StoreError), + #[error(transparent)] + SubjectPayload(#[from] SubjectPayloadError), + #[error(transparent)] + MessagePayload(#[from] MessagePayloadError), + #[error(transparent)] + Subjects(#[from] SubjectsError), + #[error(transparent)] + RecordEntity(#[from] RecordEntityError), +} + +impl From for CloseReason { + fn from(error: WebsocketError) -> Self { + CloseReason { + code: match &error { + // Error type + WebsocketError::StreamError(_) + | WebsocketError::JoinHandle(_) + | WebsocketError::Subscribe(_) + | WebsocketError::Unsubscribe(_) + | WebsocketError::Database(_) + | WebsocketError::Store(_) + | WebsocketError::SendError => CloseCode::Error, + + // Invalid type + WebsocketError::Encoder(_) + | WebsocketError::SubjectPayload(_) + | WebsocketError::MessagePayload(_) => CloseCode::Invalid, + + // Unsupported type + WebsocketError::Serde(_) + | WebsocketError::ServerRequest(_) + | WebsocketError::UnsupportedMessageType => { + CloseCode::Unsupported + } + + // Away type + WebsocketError::Closed(_) | WebsocketError::Timeout => { + CloseCode::Away + } + + // Other types + WebsocketError::ClosedWithReason { code, .. } => { + CloseCode::Other(code.to_owned()) + } + WebsocketError::ProtocolError(_) => CloseCode::Protocol, + WebsocketError::Subjects(_) => CloseCode::Error, + WebsocketError::RecordEntity(_) => CloseCode::Error, + }, + description: Some(error.to_string()), + } + } +} + +impl From for WebsocketError { + fn from(reason: CloseReason) -> Self { + WebsocketError::ClosedWithReason { + code: reason.code.into(), + description: reason.description.unwrap_or_default(), + } + } +} diff --git a/services/api/src/server/handlers/blocks.rs b/services/api/src/server/handlers/blocks.rs new file mode 100644 index 00000000..6e6d1e07 --- /dev/null +++ b/services/api/src/server/handlers/blocks.rs @@ -0,0 +1,86 @@ +use actix_web::{http::StatusCode, web, HttpRequest, HttpResponse, Result}; +use fuel_web_utils::server::middlewares::api_key::{ApiKey, DbUserApiKey}; +use serde::{Deserialize, Serialize}; +use sqlx::{Pool, Postgres}; +use validator::Validate; + +use crate::server::state::ServerState; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Database error {0}")] + Sqlx(#[from] sqlx::Error), + #[error("Validation error {0}")] + Validation(#[from] validator::ValidationErrors), +} + +impl From for actix_web::Error { + fn from(err: Error) -> Self { + match err { + Error::Sqlx(e) => actix_web::error::InternalError::new( + e, + StatusCode::INTERNAL_SERVER_ERROR, + ) + .into(), + Error::Validation(e) => { + actix_web::error::InternalError::new(e, StatusCode::BAD_REQUEST) + .into() + } + } + } +} + +#[derive(Debug, Serialize, Deserialize, Validate)] +#[serde(rename_all = "camelCase")] +pub struct GenerateApiKeyRequest { + #[validate(length(min = 6))] + pub username: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GenerateApiKeyResponse { + pub user_id: i64, + pub username: String, + pub api_key: String, +} + +impl From<&DbUserApiKey> for GenerateApiKeyResponse { + fn from(api_key: &DbUserApiKey) -> Self { + Self { + user_id: api_key.user_id as i64, + username: api_key.user_name.clone(), + api_key: api_key.api_key.clone(), + } + } +} + +async fn insert_api_key( + request: &GenerateApiKeyRequest, + tx: &Pool, +) -> Result { + let db_record = sqlx::query_as::<_, DbUserApiKey>( + "INSERT INTO api_keys (user_name, api_key) + VALUES ($1, $2) + RETURNING user_id, user_name, api_key", + ) + .bind(&request.username) + .bind(ApiKey::generate_random_api_key()) + .fetch_one(tx) + .await?; + Ok(db_record) +} + +pub async fn generate_api_key( + req: HttpRequest, + req_body: web::Json, + state: web::Data, +) -> actix_web::Result { + let _api_key = ApiKey::from_req(&req)?; + let req = req_body.into_inner(); + req.validate().map_err(Error::Validation)?; + let db_record = insert_api_key(&req, &state.db.pool) + .await + .map_err(Error::Sqlx)?; + Ok(HttpResponse::Ok().json(GenerateApiKeyResponse::from(&db_record))) +} diff --git a/services/api/src/server/handlers/mod.rs b/services/api/src/server/handlers/mod.rs new file mode 100644 index 00000000..4ebd5c89 --- /dev/null +++ b/services/api/src/server/handlers/mod.rs @@ -0,0 +1,31 @@ +pub mod blocks; + +use actix_web::web; +use fuel_web_utils::server::{ + api::with_prefixed_route, + middlewares::api_key::middleware::ApiKeyAuth, +}; + +use super::handlers; +use crate::server::state::ServerState; + +pub fn create_services( + state: ServerState, +) -> impl Fn(&mut web::ServiceConfig) + Send + Sync + 'static { + move |cfg: &mut web::ServiceConfig| { + cfg.app_data(web::Data::new(state.clone())); + cfg.service( + web::resource(format!( + "{}/{}", + with_prefixed_route("entities"), + "blocks" + )) + .wrap(ApiKeyAuth::new(&state.api_keys_manager)) + .route(web::post().to({ + move |req, body, state: web::Data| { + handlers::blocks::generate_api_key(req, body, state) + } + })), + ); + } +} diff --git a/services/api/src/server/mod.rs b/services/api/src/server/mod.rs new file mode 100644 index 00000000..a739aef7 --- /dev/null +++ b/services/api/src/server/mod.rs @@ -0,0 +1,3 @@ +pub mod errors; +pub mod handlers; +pub mod state; diff --git a/services/api/src/server/state.rs b/services/api/src/server/state.rs new file mode 100644 index 00000000..3cc3e710 --- /dev/null +++ b/services/api/src/server/state.rs @@ -0,0 +1,80 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use fuel_streams_store::db::{Db, DbConnectionOpts}; +use fuel_web_utils::{ + server::{ + middlewares::api_key::{ApiKeysManager, KeyStorage}, + state::StateProvider, + }, + telemetry::Telemetry, +}; + +use crate::{config::Config, metrics::Metrics, API_KEY_MAX_CONN_LIMIT}; + +#[derive(Clone)] +pub struct ServerState { + pub start_time: Instant, + pub telemetry: Arc>, + pub db: Arc, + pub api_keys_manager: Arc, +} + +impl ServerState { + pub async fn new(config: &Config) -> anyhow::Result { + let db = Db::new(DbConnectionOpts { + connection_str: config.db.url.clone(), + ..Default::default() + }) + .await? + .arc(); + tracing::info!("Connected to database at {}", config.db.url); + + let metrics = Metrics::new(None)?; + let telemetry = Telemetry::new(Some(metrics)).await?; + telemetry.start().await?; + tracing::info!("Initialized telemetry"); + + let api_keys_manager = + Arc::new(ApiKeysManager::new(&db, *API_KEY_MAX_CONN_LIMIT)); + let initial_keys = api_keys_manager.load_from_db().await?; + for key in initial_keys { + if let Err(e) = api_keys_manager.storage.insert(&key) { + tracing::warn!( + error = %e, + "Failed to cache initial API key" + ); + } + } + tracing::info!("Initialized api key manager"); + + Ok(Self { + db, + start_time: Instant::now(), + telemetry, + api_keys_manager, + }) + } + + pub fn uptime(&self) -> Duration { + self.start_time.elapsed() + } +} + +#[async_trait] +impl StateProvider for ServerState { + async fn is_healthy(&self) -> bool { + true + } + + async fn get_health(&self) -> serde_json::Value { + serde_json::json!({}) + } + + async fn get_metrics(&self) -> String { + self.telemetry.get_metrics().await + } +}