Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscriber): add log_subscriber to write real-time logs to redis #735

Merged
merged 41 commits into from
Feb 16, 2025

Conversation

soma00333
Copy link
Contributor

@soma00333 soma00333 commented Dec 28, 2024

Overview

This pull request introduces changes to the CI/CD pipeline and adds a new subscriber service to the project. The changes include updates to the CI configuration, the addition of new workflow files, and various setup files for the subscriber service. Here are the most important changes:

CI/CD Pipeline Updates:

  • .github/workflows/ci.yml: Added steps and outputs for the new subscriber service to the CI workflow. This includes checking for changes in the subscriber files and running the ci-subscriber job if changes are detected.

New Subscriber Service:

  • .github/workflows/ci_subscriber.yml: Introduced a new CI workflow for the subscriber service, which includes linting and testing steps using golangci-lint and go test.
  • server/subscriber/: Added multiple new files and directories for the subscriber service, including a Dockerfile, Makefile, and configuration files (.env.example, .gitignore, .golangci.yml). These files set up the environment, dependencies, and build process for the subscriber service.
  • server/subscriber/README.md: Added a comprehensive README file for the subscriber service, detailing its purpose, setup instructions, and usage examples. This includes information on how the service subscribes to a Pub/Sub topic, processes logs, and stores them in Redis.

Code Changes:

Docker and Network Configuration:

  • engine/compose.yml and server/subscriber/compose.yml: Updated Docker Compose configurations to include the new reearth-flow-net network and set up services for the subscriber and Redis.

These changes collectively enhance the CI/CD pipeline and introduce a new service that improves real-time log monitoring and storage.

What I've done

See the overview

What I haven't done

  • Add E2E tests and corresponding GitHub workflows

How I tested

I confirmed that I could get logs from the Redis as described in the readme.

Screenshot

Which point I want you to review particularly

Please read the readme and make sure it meets your expectations.

Memo

Summary by CodeRabbit

  • New Features
    • Launched a new log subscriber service that enhances message processing by integrating cloud-based messaging and caching.
    • Improved deployment orchestration with dedicated network configuration and refined volume handling.
    • Added new environment variables for configuration management of the subscriber service.
    • Introduced a logging system with structured log event management.
    • Added a new GitHub Actions workflow for linting and testing the subscriber service.
  • Chores
    • Upgraded CI processes to conditionally execute extended quality checks.
    • Updated version numbers for various packages to reflect new releases.
  • Tests
    • Expanded test coverage to ensure robust message handling and reliable error management.

Copy link
Contributor

coderabbitai bot commented Dec 28, 2024

Walkthrough

This pull request integrates the subscriber component into the CI/CD pipeline and updates various configurations and source files related to a log subscriber service. The CI workflow now detects changes in subscriber files and conditionally triggers a dedicated job. Updates include changes to Docker Compose networking, environment variable configurations, linting and testing setups; and the introduction of new Go module code for handling Pub/Sub messages, Redis storage, and log processing with added unit tests.

Changes

File(s) Change Summary
.github/workflows/ci.yml, .github/workflows/ci_subscriber.yml Enhanced CI workflow: added an output variable in the prepare job using changed-files, introduced a new ci-subscriber job, and integrated it into the main CI dependency chain.
engine/compose.yml Added external network reearth-flow-net; updated volume mount for gcs and connected both gcs and pubsub services to the new network; removed extra hosts from pubsub.
server/subscriber/.env.example, .gitignore, .golangci.yml, Makefile, compose.yml, go.mod Introduced environment variable definitions, new ignore rules, linting configuration, Makefile targets for testing/running/linting, a dedicated Docker Compose for subscriber, and a new Go module declaration.
server/subscriber/cmd/log_subscriber/main.go Implemented the main log subscriber service: reads env variables, sets up Pub/Sub and Redis connections, and handles graceful shutdown.
server/subscriber/internal/adapter/pubsub/... Added Pub/Sub abstraction: new Message, Subscription interfaces with concrete implementations (including message acknowledgment) and corresponding unit tests.
server/subscriber/internal/infrastructure/storage.go
server/subscriber/internal/usecase/gateway/log_storage.go
server/subscriber/internal/usecase/interactor/log_subscriber.go
server/subscriber/internal/usecase/interactor/log_subscriber_test.go
Introduced the log subscriber use case along with a storage gateway interface and its implementation to save log events to Redis, supported by unit tests.
server/subscriber/pkg/log/log.go Added a logging package defining LogLevel constants, a LogEvent struct, and a constructor with input validation.

Sequence Diagram(s)

sequenceDiagram
    participant CA as GitHub Actions
    participant P as Prepare Job
    participant CS as ci-subscriber Job
    participant CI as Main CI Job

    CA->>P: Trigger workflow on push/PR
    P->>CA: Run changed-files check for subscriber
    alt Subscriber changes detected
        P->>CS: Set output variable & trigger ci-subscriber
        CS->>CA: Execute tests and linting for subscriber
    else No changes detected
    end
    CI->>CA: Wait for ci-subscriber job to complete
Loading
sequenceDiagram
    participant LS as Log Subscriber (main)
    participant PS as Pub/Sub Adapter
    participant R as Redis Client
    participant UC as LogSubscriberUseCase
    participant S as Subscriber Listener

    LS->>LS: Read environment variables
    LS->>PS: Initialize Pub/Sub subscription adapter
    LS->>R: Establish Redis connection
    LS->>UC: Create use case with storage dependency
    LS->>S: Start listening for messages
    S->>PS: Wait for incoming message
    PS->>S: Deliver message data
    S->>UC: ProcessLogEvent(message)
    alt Processing succeeds
        UC->>R: Save log event to Redis
        S->>PS: Ack message
    else Error encountered
        S->>PS: Nack message
    end
    LS->>LS: Handle graceful shutdown on termination signal
Loading

Suggested reviewers

  • pyshx

Poem

I'm a rabbit, swift on my feet,
Hopping through code with a joyful beat.
CI pipelines and logs in a merry dance,
Every change makes my heart prance!
With Docker dreams and Go so bright,
I nibble bugs away by night.
Keep hopping onward—code takes flight! 🐇


📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c19af53 and d424b9e.

⛔ Files ignored due to path filters (1)
  • engine/Cargo.lock is excluded by !**/*.lock, !**/*.lock
📒 Files selected for processing (2)
  • engine/Cargo.toml (1 hunks)
  • engine/plateau-gis-quality-checker/src-tauri/Cargo.toml (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • engine/plateau-gis-quality-checker/src-tauri/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: ci-engine / ci
  • GitHub Check: ci-api / ci-api-test
🔇 Additional comments (1)
engine/Cargo.toml (1)

21-21:

❓ Verification inconclusive

LGTM! Version bump aligns with semantic versioning.

The version update from 0.0.5 to 0.0.6 is appropriate for adding new functionality (log subscriber service).

Let's verify that the version update is reflected in other related files:


🏁 Script executed:

#!/bin/bash
# Description: Check for version references in other files
# Note: Focusing on common locations where version numbers might be referenced

# Search for version strings in Cargo.toml files
echo "Checking Cargo.toml files for version references..."
fd -e toml -x rg -l "version.*0\.0\.[56]" {}

# Check for version strings in documentation
echo "Checking documentation files for version references..."
fd -e md -e txt -x rg -l "version.*0\.0\.[56]" {}

Length of output: 354


Version bump update approved for engine/Cargo.toml.

The version update from 0.0.5 to 0.0.6 is appropriate for the addition of the log subscriber service, aligning with semantic versioning. The automated search confirms that the version bump is correctly applied in the engine/Cargo.toml file (line 21). However, the search in documentation files did not return any references to this version number. Please manually verify if there are any documentation or other related files that also require an update to reflect this change.

  • File: engine/Cargo.toml (Line 21)
  • Version Change: 0.0.5 → 0.0.6

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

netlify bot commented Dec 28, 2024

Deploy Preview for reearth-flow canceled.

Name Link
🔨 Latest commit d424b9e
🔍 Latest deploy log https://app.netlify.com/sites/reearth-flow/deploys/67aec6c11f51ab00087ef062

@github-actions github-actions bot added the cicd label Jan 11, 2025
@soma00333 soma00333 marked this pull request as ready for review January 11, 2025 13:58
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🧹 Nitpick comments (25)
log_subscriber/internal/usecase/gateway/log_storage.go (1)

9-12: Add method documentation and consider additional interface methods.

The interface design is clean and follows single responsibility principle. Consider these improvements:

  1. Add method documentation to clarify the expected behavior, error conditions, and any timing guarantees
  2. Consider adding batch operations for better performance under high load
  3. Consider adding health check methods to verify storage connectivity

Example documentation:

// LogStorage defines operations for persisting log events to different storage backends
type LogStorage interface {
    // SaveToRedis persists the log event to Redis
    // Returns error if the operation fails or context is canceled
    SaveToRedis(ctx context.Context, event *domainLog.LogEvent) error
    
    // SaveToGCS persists the log event to Google Cloud Storage
    // Returns error if the operation fails or context is canceled
    SaveToGCS(ctx context.Context, event *domainLog.LogEvent) error
}
log_subscriber/internal/adapter/pubsub/message.go (1)

7-11: Consider extending Message interface with additional methods.

The current interface covers basic operations but could be enhanced with:

  1. Access to message attributes
  2. Message ID for tracing
  3. Publish time for timing analysis

Example extension:

type Message interface {
    Data() []byte
    Attributes() map[string]string
    ID() string
    PublishTime() time.Time
    Ack()
    Nack()
}
log_subscriber/internal/adapter/pubsub/subscription.go (1)

9-11: Consider adding configuration options to Subscription interface.

The interface could benefit from additional methods to configure subscription behavior:

  1. Configure message acknowledgement deadline
  2. Set max outstanding messages
  3. Configure retry policy

Example extension:

type Subscription interface {
    Receive(ctx context.Context, f func(context.Context, Message)) error
    SetReceiveSettings(settings ReceiveSettings) error
    Stop() error // Gracefully stop receiving messages
}

type ReceiveSettings struct {
    MaxOutstandingMessages int
    MaxExtensionPeriod    time.Duration
    NumGoroutines        int
}
log_subscriber/internal/infrastructure/storage.go (1)

12-15: Consider adding metrics and monitoring.

To improve observability, consider adding:

  1. Prometheus metrics for storage operations
  2. Tracing integration
  3. Structured logging
log_subscriber/internal/usecase/interactor/log_subscriber.go (3)

31-39: Consider implementing a rollback mechanism for partial failures

When saving to Redis succeeds but saving to GCS fails, the Redis entry remains without its corresponding GCS backup. This could lead to data inconsistency.

Consider implementing a rollback mechanism or using the outbox pattern to ensure data consistency across both storage systems.


26-42: Add retry mechanism for transient failures

Storage operations can fail due to temporary network issues or service unavailability. Adding a retry mechanism with backoff would improve reliability.

Consider using a retry package like github.com/cenkalti/backoff/v4 to implement retries:

 func (u *logSubscriberUseCase) ProcessLogEvent(ctx context.Context, event *domainLog.LogEvent) error {
     if event == nil {
         return fmt.Errorf("event is nil")
     }
 
+    b := backoff.NewExponentialBackOff()
+    b.MaxElapsedTime = 1 * time.Minute
+
     // 1. Write to Redis
-    if err := u.storage.SaveToRedis(ctx, event); err != nil {
+    if err := backoff.Retry(func() error {
+        return u.storage.SaveToRedis(ctx, event)
+    }, b); err != nil {
         return fmt.Errorf("failed to write to Redis: %w", err)
     }

26-42: Add logging for successful operations

Currently, there's no logging for successful operations, which makes it harder to monitor the system's behavior and performance.

Consider adding debug-level logging for successful operations:

+import "log"

 func (u *logSubscriberUseCase) ProcessLogEvent(ctx context.Context, event *domainLog.LogEvent) error {
     if event == nil {
         return fmt.Errorf("event is nil")
     }
 
     // 1. Write to Redis
     if err := u.storage.SaveToRedis(ctx, event); err != nil {
         return fmt.Errorf("failed to write to Redis: %w", err)
     }
+    log.Printf("Successfully saved event %s to Redis", event.WorkflowID)
 
     // 2. Write to GCS
     if err := u.storage.SaveToGCS(ctx, event); err != nil {
         return fmt.Errorf("failed to write to GCS: %w", err)
     }
+    log.Printf("Successfully saved event %s to GCS", event.WorkflowID)
log_subscriber/pkg/log/log.go (1)

8-20: Add method to validate LogLevel string

Add a method to validate and convert string to LogLevel, useful when receiving log level from external sources.

Consider adding:

func ParseLogLevel(level string) (LogLevel, error) {
    switch LogLevel(level) {
    case LogLevelError, LogLevelWarn, LogLevelInfo, LogLevelDebug, LogLevelTrace:
        return LogLevel(level), nil
    default:
        return "", fmt.Errorf("invalid log level: %s", level)
    }
}
log_subscriber/internal/infrastructure/redis/redis.go (2)

34-37: Make TTL configurable

The 60-minute TTL is hardcoded, making it inflexible for different environments or requirements.

Consider making TTL configurable:

 type RedisStorage struct {
     client RedisClient
+    ttl    time.Duration
 }
 
-func NewRedisStorage(client RedisClient) *RedisStorage {
-    return &RedisStorage{client: client}
+func NewRedisStorage(client RedisClient, ttl time.Duration) *RedisStorage {
+    return &RedisStorage{
+        client: client,
+        ttl:    ttl,
+    }
 }

27-27: Consider using a more distributed key pattern

The current key pattern might cause hot keys if there are many logs for the same workflow/job at similar timestamps.

Consider adding a hash or random component to distribute keys more evenly across Redis nodes:

-    key := fmt.Sprintf("log:%s:%s:%s", event.WorkflowID, event.JobID, event.Timestamp.UTC().Format(layoutWithMillis))
+    hash := fnv.New32a()
+    hash.Write([]byte(fmt.Sprintf("%s:%s", event.WorkflowID, event.JobID)))
+    shard := hash.Sum32() % 16
+    key := fmt.Sprintf("log:%d:%s:%s:%s", shard, event.WorkflowID, event.JobID, event.Timestamp.UTC().Format(layoutWithMillis))
log_subscriber/internal/infrastructure/gcs/gcs.go (1)

22-27: Add input validation in constructor.

Consider validating the input parameters to ensure they are not nil/empty.

 func NewGCSStorage(client GCSClient, bucketName string) *GCSStorage {
+	if client == nil {
+		panic("GCSClient cannot be nil")
+	}
+	if bucketName == "" {
+		panic("bucketName cannot be empty")
+	}
 	return &GCSStorage{
 		client:     client,
 		bucketName: bucketName,
 	}
 }
log_subscriber/internal/infrastructure/redis/resit_test.go (1)

55-74: Add test case for nil event.

Consider adding a test case to validate the behavior when a nil event is passed.

+	t.Run("Error: nil event", func(t *testing.T) {
+		err := rStorage.SaveLogToRedis(ctx, nil)
+		assert.Error(t, err)
+		assert.Contains(t, err.Error(), "event cannot be nil")
+		mClient.AssertNotCalled(t, "Set")
+	})
log_subscriber/internal/usecase/interactor/log_subscriber_test.go (1)

29-102: Add test case for concurrent access.

The current test suite covers the basic scenarios well. Consider adding a test case to verify the behavior under concurrent access, as the use case might be called concurrently in production.

+	t.Run("Concurrent access", func(t *testing.T) {
+		const numGoroutines = 10
+		event := &domainLog.LogEvent{
+			WorkflowID: "wf-123",
+			JobID:      "job-123",
+			Timestamp:  time.Now(),
+			LogLevel:   domainLog.LogLevelInfo,
+			Message:    "Test message",
+		}
+
+		mockStorage.
+			On("SaveToRedis", ctx, event).
+			Return(nil).Times(numGoroutines)
+		mockStorage.
+			On("SaveToGCS", ctx, event).
+			Return(nil).Times(numGoroutines)
+
+		var wg sync.WaitGroup
+		for i := 0; i < numGoroutines; i++ {
+			wg.Add(1)
+			go func() {
+				defer wg.Done()
+				err := u.ProcessLogEvent(ctx, event)
+				assert.NoError(t, err)
+			}()
+		}
+		wg.Wait()
+
+		mockStorage.AssertExpectations(t)
+	})
log_subscriber/cmd/log_subscriber/main.go (1)

99-107: Improve error handling in subscriber goroutine.

Consider adding more context to the error and implementing a backoff mechanism for retrying.

 	go func() {
 		defer wg.Done()
+		backoff := time.Second
+		maxBackoff := time.Minute
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			default:
+				if err := subscriber.StartListening(ctx); err != nil {
+					log.Printf("[log_subscriber] Subscriber error: %v. Retrying in %v...", err, backoff)
+					time.Sleep(backoff)
+					backoff = time.Duration(math.Min(float64(backoff*2), float64(maxBackoff)))
+					continue
+				}
+				return
+			}
+		}
-		if err := subscriber.StartListening(ctx); err != nil {
-			log.Printf("[log_subscriber] Subscriber error: %v", err)
-			cancel()
-		}
 	}()
log_subscriber/internal/adapter/pubsub/subscriber_test.go (3)

67-68: Consider removing .Maybe() from success test expectations

In the success test case, we expect Ack() to be called but have a .Maybe() expectation for Nack(). Since this is a success case, we should be certain that Nack() is never called.

 mMsg.On("Ack").Return().Once()
-mMsg.On("Nack").Return().Maybe()

107-108: Consider removing .Maybe() from error test expectations

In the error test case, we expect Nack() to be called but have a .Maybe() expectation for Ack(). Since this is an error case, we should be certain that Ack() is never called.

-mMsg.On("Ack").Return().Maybe()
 mMsg.On("Nack").Return().Once()

135-136: Consider removing .Maybe() from invalid JSON test expectations

In the invalid JSON test case, we expect Nack() to be called but have a .Maybe() expectation for Ack(). Since this is an error case, we should be certain that Ack() is never called.

-mMsg.On("Ack").Return().Maybe()
 mMsg.On("Nack").Return().Once()
log_subscriber/.golangci.yml (1)

1-7: Consider enabling additional linters for enhanced code quality

While gofmt and goimports handle formatting, consider enabling additional linters for better code quality:

  • errcheck: Ensures error handling
  • gosimple: Simplifies code
  • govet: Reports suspicious constructs
  • staticcheck: Performs static analysis
 linters:
   enable:
     - gofmt
     - goimports
+    - errcheck
+    - gosimple
+    - govet
+    - staticcheck
log_subscriber/Makefile (1)

1-8: Consider adding more essential make targets

While the current targets are good, consider adding:

  • build: For local builds
  • clean: For cleanup
  • help: For target documentation
  • .PHONY: To mark targets as non-file targets
+.PHONY: test run lint build clean
+
+help:
+	@echo "Available targets:"
+	@echo "  test   - Run tests with race detection"
+	@echo "  run    - Run with docker-compose"
+	@echo "  lint   - Run linter with auto-fix"
+	@echo "  build  - Build locally"
+	@echo "  clean  - Clean build artifacts"
+
 test:
 	go test -race -short -v ./...
 
 run:
 	docker-compose up --build -d
 
 lint:
 	golangci-lint run --fix
+
+build:
+	go build -o bin/subscriber ./cmd/subscriber
+
+clean:
+	rm -rf bin/
engine/compose.yml (2)

1-3: Document the purpose of external network

Consider adding a comment explaining why this network needs to be external and how it should be created.

 networks:
   reearth-flow-net:
     external: true
+    # This network should be created manually using:
+    # docker network create reearth-flow-net
+    # It's shared between engine and log-subscriber services

24-25: Consider adding network aliases

For better service discovery, consider adding network aliases to the services.

     networks:
       - reearth-flow-net
+      aliases:
+        - pubsub.reearth-flow
log_subscriber/Dockerfile.subscriber (2)

1-4: Consider pinning the exact Go version for better reproducibility.

The Dockerfile uses golang:1.22-alpine which could potentially pull in patch updates automatically. For production builds, consider pinning the exact version (e.g., golang:1.22.0-alpine).

-FROM golang:1.22-alpine AS build
+FROM golang:1.22.0-alpine AS build

24-30: Consider adding a non-root user for better security.

While using a scratch image is great for minimizing the attack surface, it's recommended to add a non-root user for running the application.

FROM scratch

+# Add non-root user (uid:gid)
+USER 10001:10001

COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=build /reearth-flow/reearth-flow-subscriber /reearth-flow/reearth-flow-subscriber
log_subscriber/compose.yml (1)

7-13: Consider adding Redis persistence and security configurations.

The Redis configuration could be enhanced with:

  • Volume mount for data persistence
  • Password protection
  • Memory limits
 redis:
   image: redis:7
   container_name: log-subscriber-redis
   restart: always
+  command: redis-server --requirepass ${REDIS_PASSWORD}
+  volumes:
+    - redis-data:/data
   ports:
     - "6379:6379"
+  environment:
+    - REDIS_PASSWORD=${REDIS_PASSWORD}
+  deploy:
+    resources:
+      limits:
+        memory: 256M
   networks:
     - reearth-flow-net
.github/workflows/ci_log_subscriber.yml (1)

37-38: Consider adding test coverage threshold.

Add a minimum coverage threshold to maintain code quality standards.

-        run: go test ./... -v -race -coverprofile=coverage.txt -covermode=atomic -timeout 10m
+        run: |
+          go test ./... -v -race -coverprofile=coverage.txt -covermode=atomic -timeout 10m
+          go tool cover -func=coverage.txt | awk 'END{if($3+0 < 80.0){print "Test coverage " $3 "% is below threshold of 80%"; exit 1}}'
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8229a48 and 7e28570.

⛔ Files ignored due to path filters (5)
  • engine/worker/README.md is excluded by !**/*.md
  • go.work is excluded by !**/*.work
  • go.work.sum is excluded by !**/*.sum
  • log_subscriber/README.md is excluded by !**/*.md
  • log_subscriber/go.sum is excluded by !**/*.sum
📒 Files selected for processing (21)
  • .github/workflows/ci.yml (5 hunks)
  • .github/workflows/ci_log_subscriber.yml (1 hunks)
  • engine/compose.yml (2 hunks)
  • log_subscriber/.golangci.yml (1 hunks)
  • log_subscriber/Dockerfile.subscriber (1 hunks)
  • log_subscriber/Makefile (1 hunks)
  • log_subscriber/cmd/log_subscriber/main.go (1 hunks)
  • log_subscriber/compose.yml (1 hunks)
  • log_subscriber/go.mod (1 hunks)
  • log_subscriber/internal/adapter/pubsub/message.go (1 hunks)
  • log_subscriber/internal/adapter/pubsub/subscriber.go (1 hunks)
  • log_subscriber/internal/adapter/pubsub/subscriber_test.go (1 hunks)
  • log_subscriber/internal/adapter/pubsub/subscription.go (1 hunks)
  • log_subscriber/internal/infrastructure/gcs/gcs.go (1 hunks)
  • log_subscriber/internal/infrastructure/redis/redis.go (1 hunks)
  • log_subscriber/internal/infrastructure/redis/resit_test.go (1 hunks)
  • log_subscriber/internal/infrastructure/storage.go (1 hunks)
  • log_subscriber/internal/usecase/gateway/log_storage.go (1 hunks)
  • log_subscriber/internal/usecase/interactor/log_subscriber.go (1 hunks)
  • log_subscriber/internal/usecase/interactor/log_subscriber_test.go (1 hunks)
  • log_subscriber/pkg/log/log.go (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • log_subscriber/go.mod
🧰 Additional context used
🪛 actionlint (1.7.4)
.github/workflows/ci_log_subscriber.yml

13-13: the runner of "actions/checkout@v3" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)


15-15: the runner of "actions/setup-go@v4" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)


30-30: the runner of "actions/checkout@v3" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)


32-32: the runner of "actions/setup-go@v4" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)


40-40: the runner of "codecov/codecov-action@v2" action is too old to run on GitHub Actions. update the action's version to fix this issue

(action)

🪛 yamllint (1.35.1)
log_subscriber/compose.yml

[error] 32-32: no new line character at the end of file

(new-line-at-end-of-file)

🔇 Additional comments (3)
log_subscriber/internal/infrastructure/redis/resit_test.go (1)

29-53: LGTM! Well-structured test with deterministic assertions.

The test effectively validates both the key format and value format with a fixed timestamp.

log_subscriber/Dockerfile.subscriber (1)

16-21: LGTM! Good build optimization flags.

The build configuration includes several good practices:

  • Disabling CGO for better portability
  • Using build tags for conditional compilation
  • Stripping debug information for smaller binary size
  • Using trimpath for reproducible builds
.github/workflows/ci.yml (1)

12-12: LGTM! Clean integration of log_subscriber CI.

The changes properly integrate the log_subscriber component into the main CI workflow, following the established patterns for other components.

Also applies to: 33-42, 78-81, 98-98

@soma00333 soma00333 requested a review from pyshx January 11, 2025 14:23
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
log_subscriber/.env.example (2)

6-6: Add a comment about Redis password security.

The empty Redis password in the example file could lead to security issues if copied directly to production. Consider adding a comment to remind users to set a strong password in their actual .env file.

-FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD=
+# Make sure to set a strong password in production
+FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD=

1-7: Add descriptive comments for environment variables.

Consider adding comments to describe each environment variable's purpose and any specific requirements or constraints. This would make the example file more self-documenting.

+# Pub/Sub emulator connection settings
 PUBSUB_EMULATOR_HOST=pubsub:8085
+
+# GCS emulator connection settings
 STORAGE_EMULATOR_HOST=http://gcs:4443
+
+# Flow Log Subscriber configuration
 FLOW_LOG_SUBSCRIBER_PROJECT_ID=local-project
 FLOW_LOG_SUBSCRIBER_SUBSCRIPTION_ID=flow-log-stream-topic-sub
+
+# Redis connection settings
+# Note: Ensure password is set in production
 FLOW_LOG_SUBSCRIBER_REDIS_ADDR=redis:6379
 FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD=
+
+# GCS bucket configuration
 FLOW_LOG_SUBSCRIBER_GCS_BUCKET_NAME=reearth-flow-oss-bucket
log_subscriber/compose.yml (1)

32-32: Add newline at end of file.

Add a newline character at the end of the file to comply with YAML best practices.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7e28570 and 9aaa478.

⛔ Files ignored due to path filters (1)
  • log_subscriber/README.md is excluded by !**/*.md
📒 Files selected for processing (3)
  • log_subscriber/.env.example (1 hunks)
  • log_subscriber/.gitignore (1 hunks)
  • log_subscriber/compose.yml (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • log_subscriber/.gitignore
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: ci-engine / ci
  • GitHub Check: ci-api / ci-api-lint
  • GitHub Check: ci-api / ci-api-test
🔇 Additional comments (3)
log_subscriber/.env.example (1)

1-2: Verify emulator port configurations.

Let's verify that the ports used for the emulators (8085 for Pub/Sub and 4443 for GCS) align with the Docker configurations and are not in conflict with other services.

✅ Verification successful

Port configurations are correctly aligned

The emulator ports in .env.example perfectly match the Docker Compose configuration:

  • Pub/Sub emulator port 8085 is properly mapped in the pubsub service
  • GCS emulator port 4443 is correctly configured in the gcs service with HTTP scheme
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for port configurations in Docker and compose files

# Check Pub/Sub port configuration
echo "Checking Pub/Sub port configuration..."
rg -l "8085" --type yaml

# Check GCS port configuration
echo "Checking GCS port configuration..."
rg -l "4443" --type yaml

Length of output: 451


Script:

#!/bin/bash
# Check the service configurations in compose.yml
echo "Content of engine/compose.yml:"
cat engine/compose.yml

Length of output: 793

log_subscriber/compose.yml (2)

1-4: LGTM! Network configuration is properly set up.

The external network configuration is correct for sharing with other services like GCS and Pub/Sub emulators.


26-32: Add default values for environment variables.

While using environment variables is good, provide default values for development ease.

Apply these changes:

     environment:
-      - PUBSUB_EMULATOR_HOST=${PUBSUB_EMULATOR_HOST}
-      - STORAGE_EMULATOR_HOST=${STORAGE_EMULATOR_HOST}
-      - FLOW_LOG_SUBSCRIBER_PROJECT_ID=${FLOW_LOG_SUBSCRIBER_PROJECT_ID}
-      - FLOW_LOG_SUBSCRIBER_SUBSCRIPTION_ID=${FLOW_LOG_SUBSCRIBER_SUBSCRIPTION_ID}
-      - FLOW_LOG_SUBSCRIBER_REDIS_ADDR=${FLOW_LOG_SUBSCRIBER_REDIS_ADDR}
-      - FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD=${FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD}
-      - FLOW_LOG_SUBSCRIBER_GCS_BUCKET_NAME=${FLOW_LOG_SUBSCRIBER_GCS_BUCKET_NAME}
+      - PUBSUB_EMULATOR_HOST=${PUBSUB_EMULATOR_HOST:-pubsub:8085}
+      - STORAGE_EMULATOR_HOST=${STORAGE_EMULATOR_HOST:-http://gcs:4443}
+      - FLOW_LOG_SUBSCRIBER_PROJECT_ID=${FLOW_LOG_SUBSCRIBER_PROJECT_ID:-local-project}
+      - FLOW_LOG_SUBSCRIBER_SUBSCRIPTION_ID=${FLOW_LOG_SUBSCRIBER_SUBSCRIPTION_ID:-flow-log-stream-topic-sub}
+      - FLOW_LOG_SUBSCRIBER_REDIS_ADDR=${FLOW_LOG_SUBSCRIBER_REDIS_ADDR:-redis:6379}
+      - FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD=${FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD:-}
+      - FLOW_LOG_SUBSCRIBER_GCS_BUCKET_NAME=${FLOW_LOG_SUBSCRIBER_GCS_BUCKET_NAME:-reearth-flow-oss-bucket}

@soma00333 soma00333 enabled auto-merge (squash) January 11, 2025 14:27
Signed-off-by: soma00333 <soma03432303@gmail.com>
Signed-off-by: soma00333 <soma03432303@gmail.com>
Signed-off-by: soma00333 <soma03432303@gmail.com>
Signed-off-by: soma00333 <soma03432303@gmail.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
log_subscriber/internal/adapter/pubsub/message.go (1)

7-11: Add documentation for the Message interface and its methods.

Consider adding documentation to explain the purpose of the interface and its methods. This will improve maintainability and help other developers understand the expected behavior.

+// Message represents a Pub/Sub message that can be acknowledged or rejected
 type Message interface {
+    // Data returns the message payload as a byte slice
     Data() []byte
+    // Ack acknowledges the message, indicating successful processing
     Ack()
+    // Nack rejects the message, indicating a processing failure
     Nack()
 }
log_subscriber/internal/adapter/pubsub/subscriber_test.go (3)

50-87: Consider adding timeout context and using constants for test data.

While the test is well-structured, consider these improvements:

  1. Use context with timeout to prevent potential test hangs
  2. Extract test data into constants for reuse across tests
+const (
+    testWorkflowID = "workflow-123"
+    testJobID      = "job-abc"
+    testMessage    = "Hello from test"
+)

 func TestSubscriber_StartListening_Success(t *testing.T) {
-    ctx := context.Background()
+    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+    defer cancel()
     
     // ... rest of the test

89-124: Maintain consistency in test data across test cases.

The test data in this error case is minimal compared to the success case. Consider using the same test data structure for consistency and completeness.

     testEvent := domainLog.LogEvent{
-        WorkflowID: "wf",
-        JobID:      "job",
+        WorkflowID: testWorkflowID,
+        JobID:      testJobID,
+        Timestamp:  time.Now(),
+        LogLevel:   domainLog.LogLevelInfo,
+        Message:    testMessage,
     }

126-154: Use more realistic invalid JSON test data.

The current invalid JSON test data ({ "invalid": ??? }) could be more realistic. Consider using a malformed version of the actual expected JSON structure.

-    mMsg.data = []byte(`{ "invalid": ??? }`)
+    mMsg.data = []byte(`{
+        "workflowID": "workflow-123",
+        "jobID": "job-abc",
+        "timestamp": "invalid-time-format",
+        "logLevel": "invalid-level",
+        "message": 123
+    }`)
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9aaa478 and 6fa0d93.

📒 Files selected for processing (4)
  • .github/workflows/ci_log_subscriber.yml (1 hunks)
  • log_subscriber/cmd/log_subscriber/main.go (1 hunks)
  • log_subscriber/internal/adapter/pubsub/message.go (1 hunks)
  • log_subscriber/internal/adapter/pubsub/subscriber_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • log_subscriber/cmd/log_subscriber/main.go
  • .github/workflows/ci_log_subscriber.yml
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-engine / ci
🔇 Additional comments (2)
log_subscriber/internal/adapter/pubsub/message.go (1)

13-34: LGTM! Clean implementation of the Message interface.

The implementation correctly wraps the Google Pub/Sub message, includes proper nil checking in the constructor, and follows the adapter pattern.

log_subscriber/internal/adapter/pubsub/subscriber_test.go (1)

17-48: LGTM! Well-structured mock implementations.

The mocks are properly implemented using testify/mock and cover all required behaviors for testing.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
.github/workflows/ci.yml (1)

33-42: Fix indentation in the changed files configuration.

The configuration is correct, but there's a minor formatting issue. The files field should be indented with 10 spaces to match the formatting of other similar blocks in the file.

       with:
-            files: |
+          files: |
             log_subscriber/**
             .github/workflows/ci.yml
             .github/workflows/ci_log_subscriber.yml
             CHANGELOG.md
🧰 Tools
🪛 yamllint (1.35.1)

[warning] 37-37: wrong indentation: expected 10 but found 12

(indentation)

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6fa0d93 and 0117c51.

📒 Files selected for processing (1)
  • .github/workflows/ci.yml (5 hunks)
🧰 Additional context used
🪛 yamllint (1.35.1)
.github/workflows/ci.yml

[warning] 37-37: wrong indentation: expected 10 but found 12

(indentation)

⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: ci-engine / ci
  • GitHub Check: ci-api / ci-api-lint
  • GitHub Check: ci-api / ci-api-test
🔇 Additional comments (3)
.github/workflows/ci.yml (3)

12-12: LGTM! Output variable properly integrated.

The log_subscriber output variable follows the established pattern and is correctly integrated with other component outputs.


98-98: LGTM! Dependencies and secrets properly configured.

The changes correctly:

  • Add the log subscriber job as a dependency for the main CI job
  • Ensure proper secrets inheritance for the workflow

Also applies to: 158-158


78-81: LGTM! CI job properly configured.

The ci-log-subscriber job is correctly integrated with appropriate dependencies and conditions.

✅ Verification successful

Workflow file verification successful

The referenced workflow file .github/workflows/ci_log_subscriber.yml exists and is properly configured with lint and test jobs for the log subscriber component.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify that the referenced workflow file exists
if [ ! -f ".github/workflows/ci_log_subscriber.yml" ]; then
  echo "Error: ci_log_subscriber.yml workflow file is missing"
  exit 1
fi

Length of output: 56


Script:

#!/bin/bash
# List workflow files and check for ci_log_subscriber.yml
echo "Checking workflow files:"
ls -la .github/workflows/
echo -e "\nAttempting to display workflow content:"
cat .github/workflows/ci_log_subscriber.yml

Length of output: 2362

@soma00333
Copy link
Contributor Author

I confirmed that the subscriber is working, following the README

@soma00333 soma00333 changed the title feat(log-subscriber): Create subscriber feat(log-subscriber): Add log_subscriber to write real-time logs to Redis Feb 1, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (10)
server/subscriber/internal/adapter/pubsub/message.go (1)

28-34: Consider adding error handling for Ack/Nack operations.

The Ack and Nack methods might fail in practice. Consider returning an error to allow proper error handling by callers.

-func (r *realMessage) Ack() {
+func (r *realMessage) Ack() error {
-	r.msg.Ack()
+	return r.msg.Ack()
}

-func (r *realMessage) Nack() {
+func (r *realMessage) Nack() error {
-	r.msg.Nack()
+	return r.msg.Nack()
}
server/subscriber/internal/usecase/interactor/log_subscriber.go (1)

25-33: Enhance error handling with more specific error types.

Consider defining custom error types for better error handling and debugging.

+var (
+    ErrNilEvent = fmt.Errorf("event is nil")
+    ErrRedisWrite = fmt.Errorf("failed to write to Redis")
+)

 func (u *logSubscriberUseCase) ProcessLogEvent(ctx context.Context, event *domainLog.LogEvent) error {
     if event == nil {
-        return fmt.Errorf("event is nil")
+        return ErrNilEvent
     }
     if err := u.storage.SaveToRedis(ctx, event); err != nil {
-        return fmt.Errorf("failed to write to Redis: %w", err)
+        return fmt.Errorf("%w: %v", ErrRedisWrite, err)
     }
     return nil
 }
server/subscriber/pkg/log/log.go (1)

8-20: Add LogLevel validation method.

Consider adding a method to validate log levels.

+var validLogLevels = map[LogLevel]bool{
+    LogLevelError: true,
+    LogLevelWarn:  true,
+    LogLevelInfo:  true,
+    LogLevelDebug: true,
+    LogLevelTrace: true,
+}

+func (l LogLevel) IsValid() bool {
+    return validLogLevels[l]
+}
server/subscriber/internal/usecase/interactor/log_subscriber_test.go (3)

31-37: Consider adding more test cases for edge cases.

The test suite could be enhanced by adding test cases for:

  • Empty required fields (WorkflowID, JobID)
  • Different log levels
  • Maximum message size limits

49-52: Enhance error message assertion.

Instead of using a generic error assertion, consider using assert.ErrorContains to verify the specific error message content.

-assert.Error(t, err, "event is nil")
+assert.ErrorContains(t, err, "event is nil")

54-69: Extract test data to reusable variables.

Consider extracting the test event data to a reusable test fixture to maintain consistency across test cases and reduce duplication.

+var testEvent = &domainLog.LogEvent{
+    WorkflowID: "wf-123",
+    JobID:      "job-123",
+    Timestamp:  time.Now(),
+    LogLevel:   domainLog.LogLevelInfo,
+    Message:    "Test message",
+}

 func TestLogSubscriberUseCase_ProcessLogEvent(t *testing.T) {
     // ... existing code ...
     t.Run("Error: storing to Redis fails", func(t *testing.T) {
-        event := &domainLog.LogEvent{
-            WorkflowID: "wf-123",
-            JobID:      "job-123",
-            Timestamp:  time.Now(),
-            LogLevel:   domainLog.LogLevelInfo,
-            Message:    "Test message",
-        }
+        event := testEvent
server/subscriber/internal/infrastructure/redis/redis_test.go (2)

20-27: Consider adding validation for Redis key expiration.

The mock Redis client's Set method should validate that the expiration duration matches the expected value.

 func (m *mockRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
+    if expiration != 12*time.Hour {
+        statusCmd := redis.NewStatusCmd(ctx)
+        statusCmd.SetErr(errors.New("invalid expiration duration"))
+        return statusCmd
+    }
     args := m.Called(ctx, key, value, expiration)
     // ... rest of the code

55-74: Add more error test cases.

Consider adding test cases for:

  • Redis connection timeout
  • Invalid JSON serialization
  • Key collision scenarios
server/subscriber/internal/adapter/pubsub/subscriber_test.go (1)

57-65: Extract test data to reusable constants.

Consider extracting test event data to constants for better maintainability and reuse across test cases.

+const (
+    testWorkflowID = "workflow-123"
+    testJobID      = "job-abc"
+    testMessage    = "Hello from test"
+)
+
+var testEvent = domainLog.LogEvent{
+    WorkflowID: testWorkflowID,
+    JobID:      testJobID,
+    Timestamp:  time.Now(),
+    LogLevel:   domainLog.LogLevelInfo,
+    Message:    testMessage,
+}

 func TestSubscriber_StartListening_Success(t *testing.T) {
     // ... existing code ...
-    testEvent := domainLog.LogEvent{
-        WorkflowID: "workflow-123",
-        JobID:      "job-abc",
-        Timestamp:  time.Now(),
-        LogLevel:   domainLog.LogLevelInfo,
-        Message:    "Hello from test",
-    }
server/subscriber/Makefile (1)

7-8: Lint Target with Auto-Fix Enabled.
Using "golangci-lint run --fix" auto-corrects lint issues, which is useful for maintaining code style; however, ensure that auto-fixes are reviewed to prevent unintended changes.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 69e4b7f and 377bd6f.

⛔ Files ignored due to path filters (3)
  • go.work is excluded by !**/*.work
  • server/subscriber/README.md is excluded by !**/*.md
  • server/subscriber/go.sum is excluded by !**/*.sum
📒 Files selected for processing (22)
  • .github/workflows/ci.yml (6 hunks)
  • .github/workflows/ci_log_subscriber.yml (1 hunks)
  • api/pkg/i18n/string_test.go (1 hunks)
  • server/subscriber/.env.example (1 hunks)
  • server/subscriber/.gitignore (1 hunks)
  • server/subscriber/.golangci.yml (1 hunks)
  • server/subscriber/Dockerfile.subscriber (1 hunks)
  • server/subscriber/Makefile (1 hunks)
  • server/subscriber/cmd/log_subscriber/main.go (1 hunks)
  • server/subscriber/compose.yml (1 hunks)
  • server/subscriber/go.mod (1 hunks)
  • server/subscriber/internal/adapter/pubsub/message.go (1 hunks)
  • server/subscriber/internal/adapter/pubsub/subscriber.go (1 hunks)
  • server/subscriber/internal/adapter/pubsub/subscriber_test.go (1 hunks)
  • server/subscriber/internal/adapter/pubsub/subscription.go (1 hunks)
  • server/subscriber/internal/infrastructure/redis/redis.go (1 hunks)
  • server/subscriber/internal/infrastructure/redis/redis_test.go (1 hunks)
  • server/subscriber/internal/infrastructure/storage.go (1 hunks)
  • server/subscriber/internal/usecase/gateway/log_storage.go (1 hunks)
  • server/subscriber/internal/usecase/interactor/log_subscriber.go (1 hunks)
  • server/subscriber/internal/usecase/interactor/log_subscriber_test.go (1 hunks)
  • server/subscriber/pkg/log/log.go (1 hunks)
✅ Files skipped from review due to trivial changes (3)
  • server/subscriber/.golangci.yml
  • server/subscriber/.gitignore
  • server/subscriber/.env.example
🚧 Files skipped from review as they are similar to previous changes (2)
  • api/pkg/i18n/string_test.go
  • .github/workflows/ci.yml
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: ci-engine / ci
  • GitHub Check: ci-websocket / ci
🔇 Additional comments (24)
server/subscriber/compose.yml (3)

1-4: Network Declaration:
The external network "reearth-flow-net" is clearly and correctly configured. Ensure that this network already exists externally or provide clear documentation/instructions for its creation when deploying the stack.


5-14: Redis Service Configuration:
The Redis service uses the official Redis image (version 7) and is set to always restart. The port mapping ("6379:6379") is standard, though consider potential port conflicts on the host if Redis is already running locally. Overall, the configuration is spot on for a dedicated Redis instance within this service.


15-31: Subscriber Service Setup:
The subscriber service is well-configured:

  • The build context and Dockerfile reference are correct.
  • The "depends_on" directive ensures Redis starts before the subscriber, which is critical for proper operation.
  • Environment variables are set up for runtime substitution; however, ensure that sensitive data such as "FLOW_LOG_SUBSCRIBER_REDIS_PASSWORD" is securely managed (for instance, via external secrets or secure environment variable management) and not hard-coded or accidentally committed.
server/subscriber/go.mod (3)

1-3: Module Declaration & Go Version Consistency Check

Line 1 declares the module as "github.com/reearth/reearth-flow/log-subscriber" and line 3 specifies the Go version as 1.22.3. Please confirm that the module path accurately reflects the intended directory structure. Based on the PR summary, the log subscriber’s main logic is located in "server/log_subscriber", yet this go.mod is placed under "server/subscriber". Verify if this is an intentional design or if you’d like to align the directory name with the module name.


5-9: Direct Dependency Declarations

The direct dependencies for Pub/Sub, Redis, and the Testify package are correctly declared with pinned versions:

  • cloud.google.com/go/pubsub v1.45.1
  • github.com/redis/go-redis/v9 v9.7.0
  • github.com/stretchr/testify v1.10.0

These appear to support the intended log subscriber functionality. Make sure you routinely run "go mod tidy" to keep the dependency list updated.


11-52: Indirect Dependency Management

The indirect dependency block is comprehensive and appears auto-generated. There are no apparent issues here; however, please ensure that after further development or dependency updates, you run "go mod tidy" to remove any extraneous entries.

server/subscriber/internal/usecase/gateway/log_storage.go (1)

9-11: Well-designed interface following clean architecture principles!

The LogStorage interface provides a clear contract for Redis storage operations with proper error handling.

server/subscriber/internal/adapter/pubsub/message.go (1)

7-11: Clean interface design with proper abstraction!

The Message interface and its implementation provide a good abstraction over Google Cloud Pub/Sub messages.

Also applies to: 13-15

server/subscriber/internal/adapter/pubsub/subscription.go (1)

9-11: Well-structured subscription handling!

The subscription implementation provides a clean abstraction over Google Cloud Pub/Sub with proper message transformation.

Also applies to: 13-15, 21-25

server/subscriber/internal/infrastructure/storage.go (1)

11-13: Clean implementation with proper separation of concerns!

The storage implementation follows dependency injection and properly delegates to the Redis implementation.

Also applies to: 21-23

server/subscriber/internal/usecase/interactor/log_subscriber.go (1)

11-23: LGTM! Clean architecture implementation.

Good separation of concerns with clear interface definition and dependency injection through constructor.

server/subscriber/internal/infrastructure/redis/redis.go (1)

27-27: LGTM! Well-structured Redis key format.

Good use of UTC timestamp and logical key structure with workflow and job IDs.

server/subscriber/pkg/log/log.go (1)

22-29: LGTM! Well-structured log event model.

Good use of JSON tags and pointer for optional field.

server/subscriber/internal/adapter/pubsub/subscriber.go (1)

26-30: LGTM! Good panic recovery implementation.

Proper implementation of panic recovery with logging.

server/subscriber/Makefile (2)

1-2: Test Target is well defined.
The "test" target runs all go tests with race detection, short mode, and verbose logging. This aligns well with best practices for Go testing.


4-6: Run Target executes Docker Compose correctly.
The "run" target uses "docker-compose up --build -d" to build and run in detached mode. This is a straightforward approach for local development and integration testing.

server/subscriber/Dockerfile.subscriber (5)

1-3: Base Image and ARG Setup.
The Dockerfile starts with a stable base (golang:1.22-alpine) and correctly defines build arguments (TAG and VERSION). This sets a clear and reproducible build environment.


5-10: Dependency Installation & Module Preparation.
Installing required packages (git, ca-certificates, build-base) and copying essential module files (go.mod, go.sum, main.go) before running "go mod download" follows common multi-stage build practices.


12-15: Copying Source Directories.
Copying the "cmd/", "pkg/", and "internal/" directories into the build context ensures that all necessary code is available for a complete build.


16-21: Go Build with Optimizations.
The build command disables CGO, applies build tags, sets linker flags for versioning and stripping debug information, and outputs the binary to a fixed path. This is a robust and optimized build process.


24-31: Final Stage with Minimal Base Image.
Using the "scratch" image for the final stage minimizes the image size and copying in the CA certificates ensures proper TLS handling. The working directory and command are set correctly to launch the subscriber.

.github/workflows/ci_log_subscriber.yml (3)

1-6: Workflow Definition and Environment Setup.
The workflow name, trigger (workflow_call), and global environment variable for GO_VERSION ("1.22") are clearly defined. This provides a solid foundation for the subsequent CI jobs.


7-25: Lint Job Configuration is Up-to-Date.
The "ci-log-subscriber-lint" job:
• Uses the updated versions (actions/checkout@v4 and actions/setup-go@v5), and
• Executes golangci-lint (v1.59.1) with a timeout limit, targeting the "server/subscriber" directory.
This ensures that code quality is automatically enforced.


26-44: Test Job is Configured for Reliable Testing.
The "ci-log-subscriber-test" job:
• Checks out the repository, sets up the Go environment with caching based on "server/subscriber/go.sum", and
• Runs tests with flags for verbosity, race detection, coverage generation, and a specified timeout.
Additionally, it uploads the coverage report using codecov (v4) with a proper token.
The conditional execution and clear working-directory specification help maintain clarity and reliability.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
.github/workflows/ci.yml (2)

10-14: New Output for log_subscriber in Prepare Job
The addition on line 12 extends the outputs of the “prepare” job to include a log_subscriber trigger. This is crucial so that subsequent jobs (e.g. the dedicated log subscriber CI job) can run conditionally based on file changes.
Consider verifying that the file-change result is appropriately formatted (usually a string like 'true') for use in the condition of downstream jobs. Also, note that while the other steps use tj-actions/changed-files@v45, this current implementation is dependent on a later step using a different version—verify this mismatch is intentional.


33-42: Changed Files Detection for log_subscriber Step
The new step (lines 33–42) correctly configures the detection of changes related to the log_subscriber component. It monitors changes in the “server/log_subscriber/**” directory as well as the relevant workflow (ci.yml, ci_log_subscriber.yml) and CHANGELOG.md.
A minor point: the action version used here (tj-actions/changed-files@v41) is different from what’s used in other steps (v45). If this is not an intentional decision based on specific requirements, consider aligning the versions for consistency.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 377bd6f and f2f4106.

📒 Files selected for processing (1)
  • .github/workflows/ci.yml (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: ci-engine / ci
🔇 Additional comments (3)
.github/workflows/ci.yml (3)

78-81: Introduction of Dedicated ci-log-subscriber Job
The added CI job for log_subscriber (lines 78–81) is well structured. It declares a dependency on the “prepare” job and uses the condition (if: needs.prepare.outputs.log_subscriber == 'true') to only trigger when changes are detected. It properly points to the dedicated workflow file (ci_log_subscriber.yml).
Great job integrating this component into the CI flow.


96-102: Inclusion of ci-log-subscriber in the Aggregated CI Job Dependencies
Within the “ci” job (lines 96–102), the dependency list now includes “ci-log-subscriber”. This ensures that the overall CI workflow waits for the log subscriber tests/linting to complete before moving forward. This integration aligns well with the project’s modular CI setup.


157-158: Secrets Propagation in Build-and-Deploy-API Job
The addition of “secrets: inherit” on line 158 in the build-and-deploy-api job helps ensure that all necessary secrets are passed along consistently, matching the setup in other build-and-deploy jobs. This is a good practice for managing credentials securely across workflows.

@pyshx pyshx changed the title feat(log-subscriber): Add log_subscriber to write real-time logs to Redis feat(subscriber): add log_subscriber to write real-time logs to redis Feb 8, 2025
@soma00333 soma00333 requested a review from a team as a code owner February 13, 2025 07:08
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
api/mongo/WiredTiger.turtle (1)

1-7: ⚠️ Potential issue

Remove WiredTiger.turtle from version control.

This is an auto-generated MongoDB WiredTiger configuration file containing runtime state. It should not be committed to version control as it:

  1. Contains runtime-specific data like checkpoints and transaction IDs
  2. Is automatically generated and managed by MongoDB
  3. May expose sensitive configuration details

Add this file to .gitignore to prevent accidental commits.

Add the following to your .gitignore file:

+# MongoDB
+api/mongo/WiredTiger.turtle
🧹 Nitpick comments (3)
.github/workflows/ci_subscriber.yml (2)

5-5: Consider using a more specific Go version.

Using "1.22" might lead to unexpected behavior when patch versions are released. Consider specifying the exact version (e.g., "1.22.0") for better reproducibility.

-  GO_VERSION: "1.22"
+  GO_VERSION: "1.22.0"

37-38: Consider adding test coverage threshold.

While you're collecting coverage data, it would be beneficial to enforce a minimum coverage threshold to maintain code quality.

-        run: go test ./... -v -race -coverprofile=coverage.txt -covermode=atomic -timeout 10m
+        run: |
+          go test ./... -v -race -coverprofile=coverage.txt -covermode=atomic -timeout 10m
+          go tool cover -func=coverage.txt | awk 'END{if($3+0 < 80.0){print "Test coverage " $3 "% is below threshold of 80%"; exit 1}}'
.github/workflows/ci.yml (1)

90-93: Consider adding deployment workflow for the subscriber service.

Other services (api, ui, websocket) have corresponding deployment workflows. Consider adding a deployment workflow for the subscriber service to maintain consistency.

Would you like me to help create a deployment workflow for the subscriber service similar to other services?

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f2f4106 and 6f83606.

⛔ Files ignored due to path filters (4)
  • api/mongo/WiredTiger.lock is excluded by !**/*.lock, !**/*.lock
  • api/mongo/mongod.lock is excluded by !**/*.lock, !**/*.lock
  • go.work is excluded by !**/*.work
  • go.work.sum is excluded by !**/*.sum
📒 Files selected for processing (5)
  • .github/workflows/ci.yml (5 hunks)
  • .github/workflows/ci_subscriber.yml (1 hunks)
  • api/.env (1 hunks)
  • api/mongo/WiredTiger (1 hunks)
  • api/mongo/WiredTiger.turtle (1 hunks)
✅ Files skipped from review due to trivial changes (2)
  • api/mongo/WiredTiger
  • api/.env
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: ci-engine / ci
  • GitHub Check: ci-api / ci-api-test
🔇 Additional comments (1)
.github/workflows/ci.yml (1)

45-53: LGTM! File change detection is properly configured.

The subscriber file change detection follows the same pattern as other services and uses the correct action version.

@soma00333 soma00333 force-pushed the feat/log-subscriber/init branch from 3601549 to a0e3b1e Compare February 13, 2025 08:09
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
.github/workflows/ci.yml (1)

33-41: Fix indentation and LGTM for implementation.

The changed files detection for subscriber is correctly implemented, but there's a minor indentation issue to fix.

Fix the indentation in the with section:

        with:
-            files: |
+          files: |
🧰 Tools
🪛 YAMLlint (1.35.1)

[warning] 37-37: wrong indentation: expected 10 but found 12

(indentation)

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4ff81e7 and c19af53.

⛔ Files ignored due to path filters (1)
  • go.work.sum is excluded by !**/*.sum
📒 Files selected for processing (1)
  • .github/workflows/ci.yml (5 hunks)
🧰 Additional context used
🪛 YAMLlint (1.35.1)
.github/workflows/ci.yml

[warning] 37-37: wrong indentation: expected 10 but found 12

(indentation)

🔇 Additional comments (3)
.github/workflows/ci.yml (3)

12-12: LGTM! Output variable correctly added.

The subscriber output variable follows the established pattern and naming convention.


98-98: LGTM! Dependency correctly added.

The ci-subscriber job is properly added to the main CI job dependencies.


78-81:

✅ Verification successful

LGTM! CI job correctly configured.

The ci-subscriber job is properly integrated with appropriate dependencies and conditions.


🏁 Script executed:

#!/bin/bash
# Verify that the referenced workflow file exists
if [ ! -f .github/workflows/ci_subscriber.yml ]; then
  echo "Error: ci_subscriber.yml workflow file is missing"
  exit 1
fi

Length of output: 52


CI Job Configuration Verified and Approved

The ci-subscriber job is correctly integrated with the appropriate dependency on prepare and the required condition. The referenced file (.github/workflows/ci_subscriber.yml) has been verified to exist with no errors.

@@ -18,7 +18,7 @@ homepage = "https://github.com/reearth/reearth-flow"
license = "MIT OR Apache-2.0"
repository = "https://github.com/reearth/reearth-flow"
rust-version = "1.84" # Remember to update clippy.toml as well
version = "0.0.5"
version = "0.0.6"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +210 to +215
if len(tc.Target) != 0 {
res["test"] = "bar"
_, originalHasKey := tc.Target["test"]
assert.False(t, originalHasKey, "original map should not have the new key")
assert.Equal(t, "bar", res["test"])
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants