Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Add real-time logging functionality using Redis #780

Open
wants to merge 28 commits into
base: main
Choose a base branch
from

Conversation

soma00333
Copy link
Contributor

@soma00333 soma00333 commented Jan 18, 2025

Overview

This pull request introduces changes to add real-time logging functionality using Redis, along with various enhancements to the GraphQL API to support log retrieval. The most important changes include modifications to environment configuration, updates to GraphQL schemas and resolvers, and the addition of Redis-based log storage and retrieval.

Environment Configuration:

  • Added Redis configuration for real-time logging in api/.env.example and api/internal/app/config/log.go.

GraphQL API Enhancements:

  • Extended the Job type and added a new Log type and LogLevel enum in api/gql/job.graphql and api/gql/log.graphql.
  • Implemented GraphQL resolver for fetching logs in api/internal/adapter/gql/resolver_query.go.
  • Added conversion functions for logs in api/internal/adapter/gql/gqlmodel/convert_log.go and corresponding tests in api/internal/adapter/gql/gqlmodel/convert_log_test.go.

Redis-based Log Storage:

  • Introduced a new Redis log repository in api/internal/infrastructure/redis/log.go.
  • Integrated Redis log repository initialization in api/internal/app/repo.go.

Dependency Updates:

  • Added new dependencies for Redis in api/go.mod.

What I've done

What I haven't done

How I tested

 curl -X POST \
  -H "Content-Type: application/json" \
  -d '{
    "query": "query GetLogs($since: DateTime!, $jobId: ID!) { logs(since: $since, jobId: $jobId) { jobId nodeId timestamp logLevel message } }",
    "variables": {
      "since": "2023-01-01T00:00:00Z",
      "jobId": "5566c900-9581-4c5c-be02-fd13e4d93669"
    }
  }' \
  http://localhost:8080/api/graphql
{
  "data": {
    "logs": [
      {
        "jobId": "5566c900-9581-4c5c-be02-fd13e4d93669",
        "nodeId": null,
        "timestamp": "2025-02-01T05:04:20.064945Z",
        "logLevel": "INFO",
        "message": "\"FileWriter\" sink start..."
      },
      {
        "jobId": "5566c900-9581-4c5c-be02-fd13e4d93669",
        "nodeId": null,
        "timestamp": "2025-02-01T05:04:20.204786Z",
        "logLevel": "INFO",
        "message": "\"FeatureCreator\" finish source complete. elapsed = 848.542µs"
      },
      {
        "jobId": "5566c900-9581-4c5c-be02-fd13e4d93669",
        "nodeId": null,
        "timestamp": "2025-02-01T05:04:20.558451Z",
        "logLevel": "INFO",
        "message": "\"FileWriter\" sink finish. elapsed = 2.514292ms"
      }
    ]
  }
}

Screenshot

Which point I want you to review particularly

Memo

Summary by CodeRabbit

Summary by CodeRabbit

Based on the comprehensive summary, here are the updated release notes:

  • New Features

    • Added logging functionality with support for multiple log levels (ERROR, WARN, INFO, DEBUG, TRACE).
    • Implemented GraphQL log query endpoint to retrieve logs for specific workflows and jobs.
    • Integrated Redis as a logging backend for real-time log storage and retrieval.
    • New logs field added to the Job type in the GraphQL schema for querying associated logs.
  • Infrastructure

    • Added Redis configuration support for logging.
    • Enhanced logging interfaces and data models across multiple packages.
  • GraphQL

    • New logs query added to retrieve log entries with filtering capabilities.
    • Supports querying logs by job ID and timestamp.
  • Performance

    • Implemented efficient log retrieval mechanisms with context and timeout management.

Copy link
Contributor

coderabbitai bot commented Jan 18, 2025

Walkthrough

This pull request introduces a comprehensive logging system for the ReEarth Flow application, leveraging Redis for log storage and retrieval. The changes span multiple packages and include GraphQL schema modifications, infrastructure implementations, and test coverage. The new logging functionality allows retrieving logs for specific workflows and jobs, with support for different log levels (ERROR, WARN, INFO, DEBUG, TRACE) and flexible querying capabilities.

Changes

File Change Summary
api/go.mod Added Redis client library and related dependencies
api/gql/log.graphql Introduced GraphQL schema for logging with LogLevel enum and Log type
api/internal/adapter/gql/* Added GraphQL resolvers, loaders, and models for log functionality
api/pkg/log/log.go Created base logging package with Log struct and log levels
api/internal/infrastructure/redis/log.go Implemented Redis-based log repository
api/.env.example Added Redis configuration environment variables
api/internal/app/config/log.go Introduced Redis logging configuration struct
api/internal/usecase/gateway/log.go Added logging interface for log retrieval
api/internal/usecase/interactor/log.go Implemented LogInteractor for log retrieval logic
api/internal/adapter/gql/loader_log.go Created LogLoader for GraphQL log loading
api/internal/adapter/gql/resolver_query.go Added Logs method to queryResolver for fetching logs
api/internal/infrastructure/redis/log_test.go Added tests for Redis logging functionality
api/internal/usecase/interactor/log_test.go Added tests for LogInteractor functionality
api/pkg/log/log_test.go Added tests for log package functionality
api/internal/adapter/gql/convert_log_test.go Added tests for converting log types

Suggested labels

logging, redis, backend-enhancement, graphql

Suggested reviewers

  • KaWaite

Poem

🐰 Logs dancing in Redis streams,
Tracing workflows like rabbit dreams,
From ERROR to TRACE, we capture the flow,
With GraphQL magic, our logging will grow!
Hop, hop, hooray for structured insight! 🌈

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

netlify bot commented Jan 18, 2025

Deploy Preview for reearth-flow canceled.

Name Link
🔨 Latest commit 78c75b5
🔍 Latest deploy log https://app.netlify.com/sites/reearth-flow/deploys/679dd0e25b3ce00008803d5e

Copy link

codecov bot commented Jan 18, 2025

Codecov Report

Attention: Patch coverage is 76.53631% with 42 lines in your changes missing coverage. Please review.

Project coverage is 23.04%. Comparing base (323e1f4) to head (78c75b5).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
api/internal/app/repo.go 0.00% 16 Missing ⚠️
api/internal/app/config/log.go 0.00% 11 Missing ⚠️
api/internal/infrastructure/redis/log.go 90.00% 5 Missing and 3 partials ⚠️
api/internal/adapter/gql/loader_log.go 81.25% 2 Missing and 1 partial ⚠️
api/internal/adapter/gql/resolver_query.go 0.00% 2 Missing ⚠️
api/internal/adapter/gql/loader.go 0.00% 1 Missing ⚠️
api/internal/usecase/interactor/common.go 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #780      +/-   ##
==========================================
- Coverage   25.42%   23.04%   -2.38%     
==========================================
  Files         137      143       +6     
  Lines        7713     7893     +180     
==========================================
- Hits         1961     1819     -142     
- Misses       5558     5920     +362     
+ Partials      194      154      -40     
Flag Coverage Δ
api 23.04% <76.53%> (-2.38%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
api/internal/adapter/gql/gqlmodel/convert_log.go 100.00% <100.00%> (ø)
api/internal/app/config/config.go 19.67% <ø> (ø)
api/internal/usecase/interactor/log.go 100.00% <100.00%> (ø)
api/pkg/log/log.go 100.00% <100.00%> (ø)
api/internal/adapter/gql/loader.go 0.00% <0.00%> (-64.11%) ⬇️
api/internal/usecase/interactor/common.go 15.21% <0.00%> (ø)
api/internal/adapter/gql/resolver_query.go 0.00% <0.00%> (-24.76%) ⬇️
api/internal/adapter/gql/loader_log.go 81.25% <81.25%> (ø)
api/internal/infrastructure/redis/log.go 90.00% <90.00%> (ø)
api/internal/app/config/log.go 0.00% <0.00%> (ø)
... and 1 more

... and 16 files with indirect coverage changes

@soma00333 soma00333 marked this pull request as ready for review January 18, 2025 13:00
@soma00333 soma00333 requested a review from pyshx as a code owner January 18, 2025 13:00
@soma00333 soma00333 enabled auto-merge (squash) January 18, 2025 13:01
@soma00333 soma00333 changed the title feat(api): Add Logs() feat(api): Add Logs() to extract realtime logging Jan 18, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

🧹 Nitpick comments (11)
api/internal/adapter/gql/generated.go (2)

2336-2356: Defining Log type and logs query in GraphQL schema

The Log type, along with the LogLevel enum, is well-defined in the GraphQL schema. The logs query is properly added, accepting since, workflowId, and jobId as non-nullable arguments, and returning a non-nullable list of Log objects.

Consider making jobId argument optional in the logs query

To enhance flexibility, consider making the jobId argument optional. This would allow users to retrieve logs for an entire workflow without specifying a specific job, which could be useful in scenarios where logs need to be analyzed at the workflow level.


2355-2355: Add pagination parameters to logs query

To handle large volumes of logs and improve performance, consider adding pagination parameters (first, last, after, before) to the logs query. This will help manage the size of the data returned and provide better control to the clients consuming the API.

api/internal/usecase/gateway/container.go (1)

7-8: Consider renaming fields for better clarity and separation of concerns.

The current field names LogRedis and LogGCS tightly couple the interface to specific implementations. Consider more abstract names like PrimaryLogger and SecondaryLogger or ShortTermLogger and LongTermLogger to better reflect their roles rather than their implementations.

api/internal/usecase/gateway/log.go (1)

11-13: Consider enhancing the Log interface for better scalability and clarity.

  1. The interface could benefit from pagination support to handle large volumes of logs efficiently.
  2. The time.Time parameter's purpose is ambiguous - consider adding a comment or renaming it to clarify if it's a start time, end time, or exact timestamp.
  3. Consider adding filtering options (e.g., log levels, time ranges) for more flexible log retrieval.

Example enhancement:

type LogFilter struct {
    Since      time.Time
    Until      time.Time
    LogLevels  []LogLevel
    Pagination *PaginationOptions
}

type Log interface {
    GetLogs(ctx context.Context, filter LogFilter, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, *PaginationResult, error)
}
api/internal/adapter/gql/loader_log.go (2)

20-32: Add error wrapping for better error context.

The error handling could be improved by wrapping errors with additional context about the failure point.

 func (l *LogLoader) GetLogs(ctx context.Context, since time.Time, workflowID gqlmodel.ID, jobID gqlmodel.ID) ([]*gqlmodel.Log, error) {
 	newJobID, err := id.JobIDFrom(string(jobID))
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("invalid job ID: %w", err)
 	}
 	newWorkflowID, err := id.WorkflowIDFrom(string(workflowID))
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("invalid workflow ID: %w", err)
 	}
 	res, err := l.usecase.GetLogs(ctx, since, newWorkflowID, newJobID, getOperator(ctx))
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("failed to get logs: %w", err)
 	}

34-38: Consider using slice preallocation for better performance.

The slice preallocation is good, but consider returning early for empty results to avoid unnecessary allocations.

+	if len(res) == 0 {
+		return nil, nil
+	}
 	logs := make([]*gqlmodel.Log, 0, len(res))
 	for _, log := range res {
 		logs = append(logs, gqlmodel.ToLog(log))
 	}
 	return logs, nil
api/pkg/log/log.go (1)

15-22: Consider adding validation methods for required fields.

The Log struct should have methods to validate required fields and handle empty messages.

+func (l *Log) Validate() error {
+	if l.workflowID == "" || l.jobID == "" {
+		return fmt.Errorf("workflowID and jobID are required")
+	}
+	if l.message == "" {
+		return fmt.Errorf("log message cannot be empty")
+	}
+	return nil
+}
api/internal/app/repo.go (1)

73-75: Consider conditional initialization of log backends.

Both Redis and GCS log backends are initialized unconditionally. This could lead to unnecessary resource usage if not all backends are needed.

Consider initializing only the configured backends and storing them in a slice:

-    gateways.LogRedis = initLogRedis(ctx, conf)
-    gateways.LogGCS = initLogGCS(ctx, conf)
+    var logBackends []gateway.Log
+    if redisLog := initLogRedis(ctx, conf); redisLog != nil {
+        logBackends = append(logBackends, redisLog)
+    }
+    if gcsLog := initLogGCS(ctx, conf); gcsLog != nil {
+        logBackends = append(logBackends, gcsLog)
+    }
+    gateways.Logs = logBackends
api/internal/adapter/gql/resolver_query.go (1)

53-55: Consider adding pagination and time range validation.

The Logs query might return large result sets and could benefit from pagination. Additionally, consider validating the time range to prevent excessive queries.

Consider these improvements:

  1. Add pagination support:
-func (r *queryResolver) Logs(ctx context.Context, since time.Time, workflowId gqlmodel.ID, jobId gqlmodel.ID) ([]*gqlmodel.Log, error) {
+func (r *queryResolver) Logs(ctx context.Context, since time.Time, workflowId gqlmodel.ID, jobId gqlmodel.ID, pagination *gqlmodel.Pagination) (*gqlmodel.LogConnection, error) {
  1. Add time range validation:
 func (r *queryResolver) Logs(ctx context.Context, since time.Time, workflowId gqlmodel.ID, jobId gqlmodel.ID) ([]*gqlmodel.Log, error) {
+    maxRange := 24 * time.Hour
+    if time.Since(since) > maxRange {
+        return nil, fmt.Errorf("time range cannot exceed %v", maxRange)
+    }
     return loaders(ctx).Log.GetLogs(ctx, since, workflowId, jobId)
 }
api/gql/log.graphql (2)

1-7: Add documentation for the LogLevel enum.

Consider adding descriptions for the enum and its values to improve API documentation.

Apply this diff:

+"""
+Represents the severity level of a log entry.
+"""
 enum LogLevel {
-  ERROR
-  WARN
-  INFO
-  DEBUG
-  TRACE
+  """System is unusable or a critical error occurred."""
+  ERROR
+  """Warning messages for potentially harmful situations."""
+  WARN
+  """General informational messages about system operation."""
+  INFO
+  """Detailed messages useful for debugging."""
+  DEBUG
+  """Very detailed messages for tracing program execution."""
+  TRACE
 }

18-20: Consider making workflowId and jobId optional.

Making these parameters optional would allow querying logs across all workflows or jobs within a time range.

Apply this diff:

 extend type Query {
-  logs(since: DateTime!, workflowId: ID!, jobId: ID!): [Log!]!
+  """
+  Retrieve logs within a time range, optionally filtered by workflow and job.
+  """
+  logs(
+    """Start time for the log query."""
+    since: DateTime!,
+    """Filter logs by workflow ID."""
+    workflowId: ID,
+    """Filter logs by job ID."""
+    jobId: ID
+  ): [Log!]!
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7ade697 and 6bd60e5.

⛔ Files ignored due to path filters (2)
  • api/go.sum is excluded by !**/*.sum
  • go.work.sum is excluded by !**/*.sum
📒 Files selected for processing (19)
  • api/go.mod (2 hunks)
  • api/gql/log.graphql (1 hunks)
  • api/internal/adapter/gql/generated.go (16 hunks)
  • api/internal/adapter/gql/gqlmodel/convert_log.go (1 hunks)
  • api/internal/adapter/gql/gqlmodel/models_gen.go (3 hunks)
  • api/internal/adapter/gql/loader.go (2 hunks)
  • api/internal/adapter/gql/loader_log.go (1 hunks)
  • api/internal/adapter/gql/resolver_query.go (2 hunks)
  • api/internal/app/repo.go (3 hunks)
  • api/internal/infrastructure/gcs/log.go (1 hunks)
  • api/internal/infrastructure/redis/log.go (1 hunks)
  • api/internal/usecase/gateway/container.go (1 hunks)
  • api/internal/usecase/gateway/log.go (1 hunks)
  • api/internal/usecase/interactor/common.go (1 hunks)
  • api/internal/usecase/interactor/log.go (1 hunks)
  • api/internal/usecase/interfaces/common.go (1 hunks)
  • api/internal/usecase/interfaces/log.go (1 hunks)
  • api/pkg/log/id.go (1 hunks)
  • api/pkg/log/log.go (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • api/pkg/log/id.go
🔇 Additional comments (14)
api/internal/adapter/gql/generated.go (9)

170-177: Addition of Log struct in ComplexityRoot

The Log struct has been correctly added to ComplexityRoot to define complexity functions for each field in the Log type. This ensures that complexity analysis for the Log fields is properly handled.


280-280: Addition of Logs function in ComplexityRoot.Query

The Logs function is appropriately added to the Query struct within ComplexityRoot to calculate the complexity of the logs query, accommodating the new parameters since, workflowID, and jobID.


421-421: Adding Logs method to QueryResolver interface

The Logs method is correctly added to the QueryResolver interface, allowing for the retrieval of logs based on the specified parameters. This enhances the GraphQL API's capability to fetch logs.


847-888: Implementing complexity estimations for Log fields

Complexity functions for each field in the Log type have been properly implemented in the switch cases. This ensures accurate complexity calculations during query execution, which is essential for optimizing GraphQL performance.


1591-1602: Handling arguments in Query.logs complexity function

The field_Query_logs_args function is correctly utilized to parse and validate the arguments for the logs query within the complexity estimation. Proper error handling is in place to ensure robustness.


3370-3402: Correct argument handling in field_Query_logs_args

The field_Query_logs_args function correctly parses the required arguments for the logs query. However, ensure that proper validation is in place for argument types and values to prevent potential runtime errors.


Line range hint 6434-11013: Implementing resolver methods for Log fields

The resolver methods for the Log type fields are correctly implemented, ensuring appropriate data retrieval and error handling. This allows for each field of the Log type to be properly resolved when queried.


17071-17131: Marshaling the Log type

The marshaling function for the Log type is properly implemented to serialize Log objects in GraphQL responses. This ensures that Log data is accurately represented in the API output.


19895-19958: Marshaling and unmarshaling of LogLevel and Log types

The marshal and unmarshal functions for both the LogLevel enum and the Log type are correctly implemented. This provides necessary serialization and deserialization support for these types within the GraphQL API.

api/internal/usecase/interfaces/log.go (1)

12-14: Document the relationship between interfaces.Log and gateway.Log.

  1. The relationship between interfaces.Log and gateway.Log is unclear. Consider adding package documentation explaining:
    • Why two similar interfaces exist
    • When to use each interface
    • The role of the Operator parameter
  2. Consider using interface embedding if interfaces.Log is meant to extend gateway.Log.

Let's check for any other similar interfaces in the codebase:

✅ Verification successful

Document the clean architecture pattern implemented by Log interfaces

The relationship between interfaces.Log and gateway.Log follows the clean architecture pattern:

  • gateway.Log: Infrastructure layer interface for storage implementations (Redis, GCS)
  • interfaces.Log: Application layer interface adding authorization capability via Operator
  • LogInteractor: Implementation that combines Redis (recent logs) and GCS (older logs)

Consider adding package documentation explaining this architecture and the currently unused Operator parameter's intended purpose for authorization.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for Log interface definitions
rg "type\s+Log\s+interface" -A 3

Length of output: 491


Script:

#!/bin/bash
# Search for implementations and usages
echo "=== Searching for GetLogs implementations ==="
ast-grep --pattern 'func $_($_, $_, $_, $_) ([]*log.Log, error)'

echo -e "\n=== Searching for interface usages ==="
rg "gateway\.Log|interfaces\.Log" -B 2 -A 2

echo -e "\n=== Checking for package documentation ==="
fd "^log.go$" --exec head -n 10 {}

Length of output: 3480


Script:

#!/bin/bash
# Check LogInteractor implementation
echo "=== LogInteractor Implementation ==="
cat api/internal/usecase/interactor/log.go

Length of output: 1069

api/internal/usecase/interfaces/common.go (1)

28-28: LGTM! Clean addition of Log field

The Log field is properly integrated into the Container struct, maintaining consistency with the existing structure.

api/internal/adapter/gql/gqlmodel/models_gen.go (1)

218-225: LGTM!

The implementation of the Log struct and LogLevel type is well-structured and follows standard patterns. The GQL marshaling/unmarshaling implementation is correct.

Also applies to: 597-642

api/go.mod (2)

44-45: LGTM! Indirect dependencies look good.

The indirect dependencies github.com/cespare/xxhash/v2 and github.com/dgryski/go-rendezvous are legitimate Redis client dependencies with appropriate versions.


20-20: ⚠️ Potential issue

Fix invalid Redis client version.

The specified version v9.7.0 of github.com/redis/go-redis/v9 does not exist in the official releases. The latest stable version is v9.5.1.

Apply this diff to fix the version:

-	github.com/redis/go-redis/v9 v9.7.0
+	github.com/redis/go-redis/v9 v9.5.1

Run the following script to verify the Redis client version and check for known vulnerabilities:

api/internal/adapter/gql/gqlmodel/convert_log.go Outdated Show resolved Hide resolved
api/internal/infrastructure/gcs/log.go Outdated Show resolved Hide resolved
api/internal/infrastructure/gcs/log.go Outdated Show resolved Hide resolved
api/internal/infrastructure/gcs/log.go Outdated Show resolved Hide resolved
api/internal/infrastructure/redis/log.go Show resolved Hide resolved
api/internal/adapter/gql/loader.go Show resolved Hide resolved
api/internal/usecase/interactor/common.go Outdated Show resolved Hide resolved
api/internal/app/repo.go Outdated Show resolved Hide resolved
api/internal/app/repo.go Outdated Show resolved Hide resolved
api/internal/app/repo.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
api/internal/infrastructure/gcs/log.go (1)

26-26: Enhance TODO comment with implementation details.

The TODO comment should outline the implementation steps for better clarity.

-	// TODO: Implement
+	// TODO: Implement GCS log retrieval
+	// 1. Use client.Bucket to get bucket handle
+	// 2. List objects with prefix matching workflowID/jobID
+	// 3. Filter objects modified after 'since' timestamp
+	// 4. Read and parse log entries from matching objects
+	// 5. Return parsed logs
api/internal/infrastructure/redis/log.go (1)

27-27: Enhance TODO comment with implementation details.

The TODO comment should outline the implementation steps for better clarity.

-	// TODO: Implement
+	// TODO: Implement Redis log retrieval
+	// 1. Construct Redis key pattern using workflowID/jobID
+	// 2. Use ZRANGEBYSCORE to get logs after 'since' timestamp
+	// 3. Deserialize and parse log entries
+	// 4. Return parsed logs
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6774f69 and 489f0d5.

📒 Files selected for processing (3)
  • api/internal/app/repo.go (3 hunks)
  • api/internal/infrastructure/gcs/log.go (1 hunks)
  • api/internal/infrastructure/redis/log.go (1 hunks)
🔇 Additional comments (5)
api/internal/infrastructure/gcs/log.go (1)

25-34: ⚠️ Potential issue

Fix GetLogs implementation to use provided parameters.

The current implementation generates new IDs instead of using the provided workflowID and jobID parameters. This will make it impossible to retrieve logs for specific workflows and jobs.

Apply this diff to fix the implementation:

 func (g *gcsLog) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {
 	// TODO: Implement
 	nodeID := log.NodeID(id.NewNodeID())
 	dummyLogs := []*log.Log{
-		log.NewLog(id.NewWorkflowID(), id.NewJobID(), nil, log.LevelInfo, "Test log message 1 from gcs"),
-		log.NewLog(id.NewWorkflowID(), id.NewJobID(), &nodeID, log.LevelDebug, "Test log message 2 from gcs"),
+		log.NewLog(workflowID, jobID, nil, log.LevelInfo, "Test log message 1 from gcs"),
+		log.NewLog(workflowID, jobID, &nodeID, log.LevelDebug, "Test log message 2 from gcs"),
 	}
 
 	return dummyLogs, nil
 }

Likely invalid or redundant comment.

api/internal/infrastructure/redis/log.go (1)

26-35: ⚠️ Potential issue

Fix receiver name and GetLogs implementation.

  1. The receiver variable 'g' should be 'r' to match the redisLog type
  2. The implementation generates new IDs instead of using the provided parameters

Apply this diff to fix the implementation:

-func (g *redisLog) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {
+func (r *redisLog) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {
 	// TODO: Implement
 	nodeID := log.NodeID(id.NewNodeID())
 	dummyLogs := []*log.Log{
-		log.NewLog(id.NewWorkflowID(), id.NewJobID(), nil, log.LevelInfo, "Test log message 1 from redis"),
-		log.NewLog(id.NewWorkflowID(), id.NewJobID(), &nodeID, log.LevelDebug, "Test log message 2 from redis"),
+		log.NewLog(workflowID, jobID, nil, log.LevelInfo, "Test log message 1 from redis"),
+		log.NewLog(workflowID, jobID, &nodeID, log.LevelDebug, "Test log message 2 from redis"),
 	}
 
 	return dummyLogs, nil
 }

Likely invalid or redundant comment.

api/internal/app/repo.go (3)

132-141: 🛠️ Refactor suggestion

Add Redis connection validation.

The Redis client creation should validate the connection by performing a PING command.

 func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log {
+	if !conf.Redis.IsConfigured() {
+		return nil
+	}
 	client := redis.NewClient(&redis.Options{
 		Addr: conf.Redis.Addr,
+		Password: conf.Redis.Password,
+		DB: conf.Redis.DB,
 	})
+	if err := client.Ping(ctx).Err(); err != nil {
+		log.Fatalf("Failed to connect to Redis: %v", err)
+	}
 	logRedisRepo, err := redisrepo.NewRedisLog(client)
 	if err != nil {
 		log.Fatalf("Failed to create redis log repository: %v", err)
 	}
 	return logRedisRepo
 }

Likely invalid or redundant comment.


133-135: ⚠️ Potential issue

Replace hardcoded Redis configuration with values from config.

The Redis connection string is hardcoded to "localhost:6379". This should be configurable through the application's configuration system.

 	client := redis.NewClient(&redis.Options{
-		Addr: "localhost:6379",
+		Addr: conf.Redis.Addr,
+		Password: conf.Redis.Password,
+		DB: conf.Redis.DB,
 	})

Likely invalid or redundant comment.


143-153: 🛠️ Refactor suggestion

Add GCS configuration validation.

The GCS client initialization should check if GCS logging is enabled in the configuration.

 func initLogGCS(ctx context.Context, conf *config.Config) gateway.Log {
+	if !conf.GCS.IsConfigured() {
+		return nil
+	}
 	gcsClient, err := storage.NewClient(ctx)
 	if err != nil {
 		log.Fatalf("Failed to create gcs client: %v", err)
 	}
-	logGCSRepo, err := gcs.NewGCSLog(gcsClient)
+	logGCSRepo, err := gcs.NewGCSLog(gcsClient, conf.GCS.LogBucket)
 	if err != nil {
 		log.Fatalf("Failed to create gcs log repository: %v", err)
 	}
 	return logGCSRepo
 }

Likely invalid or redundant comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
api/internal/usecase/interactor/log.go (2)

25-27: Define default threshold as a package constant.

The default threshold of 60 minutes should be defined as a package constant for better maintainability and reusability.

+const defaultRecentLogsThreshold = 60 * time.Minute

 func NewLogInteractor(lgRedis gateway.Log, lgGCS gateway.Log, recentLogsThreshold time.Duration) interfaces.Log {
 	// ...
 	if recentLogsThreshold <= 0 {
-		recentLogsThreshold = 60 * time.Minute
+		recentLogsThreshold = defaultRecentLogsThreshold
 	}

37-39: Define query timeout as a package constant.

The 30-second timeout duration should be defined as a package constant for better maintainability.

+const defaultQueryTimeout = 30 * time.Second

 func (li *LogInteractor) GetLogs(...) ([]*log.Log, error) {
 	// Add timeout to prevent long-running queries
-	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	ctx, cancel := context.WithTimeout(ctx, defaultQueryTimeout)
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 489f0d5 and b8f0034.

📒 Files selected for processing (4)
  • api/internal/infrastructure/redis/log.go (1 hunks)
  • api/internal/usecase/interactor/common.go (2 hunks)
  • api/internal/usecase/interactor/log.go (1 hunks)
  • api/pkg/log/log.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • api/internal/usecase/interactor/common.go
  • api/internal/infrastructure/redis/log.go
  • api/pkg/log/log.go
🧰 Additional context used
🪛 GitHub Actions: ci
api/internal/usecase/interactor/log.go

[error] 23-23: Panic: Log gateways cannot be nil

🔇 Additional comments (3)
api/internal/usecase/interactor/log.go (3)

15-19: LGTM! Well-structured struct definition.

The LogInteractor struct is well-designed with clear field names and appropriate types. The recentLogsThreshold field has been added as requested.


40-51: LGTM! Well-implemented log retrieval logic.

The implementation correctly:

  • Uses the configured threshold to choose between Redis and GCS
  • Properly handles errors with descriptive messages
  • Returns early on Redis success

36-36: 🛠️ Refactor suggestion

Remove unused operator parameter.

The operator parameter is not used in the method implementation and should be removed.

-func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID, operator *usecase.Operator) ([]*log.Log, error) {
+func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {

Likely invalid or redundant comment.

api/internal/usecase/interactor/log.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
api/pkg/log/log.go (2)

7-13: 🛠️ Refactor suggestion

Add an IsValid method to validate log levels.

This suggestion duplicates a past comment about preventing invalid log levels. Please consider implementing an IsValid check in the Level type to ensure correctness and consistency throughout the system.

func (l Level) IsValid() bool {
	switch l {
	case LevelError, LevelWarn, LevelInfo, LevelDebug, LevelTrace:
		return true
	default:
		return false
	}
}

24-33: 🛠️ Refactor suggestion

Validate log level in the constructor.

This is also a duplicate request from a previous review to ensure that invalid log levels default to LevelInfo. Consider updating the constructor with validation logic to avoid confusion or errors down the line.

 func NewLog(workflowID WorkflowID, jobID JobID, nodeID *NodeID, time time.Time, level Level, message string) *Log {
+	if !level.IsValid() {
+		level = LevelInfo
+	}
 	return &Log{
 		workflowID: workflowID,
 		jobID:      jobID,
 		nodeID:     nodeID,
 		timestamp:  time,
 		level:      level,
 		message:    message,
 	}
 }
🧹 Nitpick comments (6)
api/internal/adapter/gql/loader_log_test.go (4)

17-19: Add a brief comment explaining the purpose of this mock.
Consider adding a short docstring or comment to help clarify that this mock is used to test the LogUsecase methods without interfacing with actual external resources.


21-24: Safeguard against potential type assertion issues.
When retrieving args.Get(0) as []*log.Log, consider checking for successful type conversion to avoid unexpected panics if the mock isn't set up correctly.


26-45: Verify log content in addition to log count.
Currently, the test only checks that the returned array has the same length as the mocked data. Consider verifying the actual log messages or levels to ensure full correctness.


47-61: Verify the specific error message.
While you assert an error occurs, you might also confirm that the returned error message matches “usecase error.” This approach offers stronger validation of the error-handling path.

api/internal/usecase/interactor/common.go (1)

31-31: Make log retention duration configurable.

The 60-minute duration is hardcoded. Consider making it configurable through ContainerConfig for better flexibility.

Apply this diff to make the duration configurable:

 type ContainerConfig struct {
 	SignupSecret    string
 	AuthSrvUIDomain string
+	LogRetention    time.Duration
 }

 func NewContainer(...) interfaces.Container {
 	job := NewJob(r, g)
-	log, err := NewLogInteractor(g.LogRedis, g.LogGCS, 60*time.Minute)
+	logRetention := config.LogRetention
+	if logRetention == 0 {
+		logRetention = 60 * time.Minute // default value
+	}
+	log, err := NewLogInteractor(g.LogRedis, g.LogGCS, logRetention)
api/internal/adapter/gql/gqlmodel/convert_log_test.go (1)

17-32: Consider using a fixed timestamp for test reproducibility.

Currently, the test uses time.Now(), which can cause slight variations in the timestamp. Using a fixed timestamp would improve test consistency and reduce flakiness.

- d := log.NewLog(wfid, jid, &nid, time.Now(), log.LevelInfo, "message")
+ fixedTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
+ d := log.NewLog(wfid, jid, &nid, fixedTime, log.LevelInfo, "message")
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b8f0034 and 8f63e98.

📒 Files selected for processing (7)
  • api/internal/adapter/gql/gqlmodel/convert_log_test.go (1 hunks)
  • api/internal/adapter/gql/loader_log_test.go (1 hunks)
  • api/internal/infrastructure/gcs/log.go (1 hunks)
  • api/internal/infrastructure/redis/log.go (1 hunks)
  • api/internal/usecase/interactor/common.go (2 hunks)
  • api/internal/usecase/interactor/log.go (1 hunks)
  • api/pkg/log/log.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • api/internal/infrastructure/redis/log.go
  • api/internal/infrastructure/gcs/log.go
  • api/internal/usecase/interactor/log.go
🔇 Additional comments (3)
api/internal/usecase/interactor/common.go (2)

5-5: LGTM!

The time package import is correctly added and necessary for the log duration configuration.


31-34: Backend failure handling needs improvement.

The LogInteractor initialization lacks error handling and fallback mechanisms for backend failures.

api/internal/adapter/gql/gqlmodel/convert_log_test.go (1)

12-16: Great coverage of nil input case.

Testing the nil input scenario ensures robust handling of edge cases. Having a dedicated test for it is a good practice and prevents potential runtime errors.

api/internal/usecase/interactor/common.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
api/pkg/log/log_test.go (2)

10-38: Well-structured test for log creation.

The overall logic for verifying each field of the newly created log instance looks clear and comprehensive. However, consider the implications of using time.Now() directly in tests, as minor clock skews or prolonged test runs might introduce a delay. Although unlikely, you could improve test stability by using a fixed time.Time value (e.g. via time.Date(...)) for extra consistency.


40-61: Recommend adding a test case for nil NodeID.

Since the code passes a pointer for NodeID, it might be valuable to verify behavior when the NodeID is intentionally set to nil, if supported by the application logic. This can help ensure stability for logs that don’t specify a node.

+func TestLogWithNilNodeID(t *testing.T) {
+    workflowID := id.NewWorkflowID()
+    jobID := id.NewJobID()
+    // Passing nil for nodeID
+    log := NewLog(workflowID, jobID, nil, time.Now(), LevelDebug, "Nil node test")
+
+    if log.NodeID() != nil {
+        t.Errorf("Expected NodeID to be nil, got %v", log.NodeID())
+    }
+}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8f63e98 and 37151b9.

📒 Files selected for processing (1)
  • api/pkg/log/log_test.go (1 hunks)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
api/internal/usecase/interactor/log_test.go (2)

24-56: Extensive test coverage for “NewLogInteractor” instantiation.
The sub-tests correctly verify success cases, error handling, and default threshold assignment. As a small improvement, consider using require.NoError instead of assert.NoError in scenarios where subsequent checks depend on the absence of an error.


58-120: Comprehensive test coverage of log retrieval logic.
All edge cases—redis usage, GCS usage, and error scenarios—are handled thoroughly. If the codebase grows, consider a table-driven testing approach to streamline the repeated patterns in these sub-tests.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 37151b9 and e70e885.

📒 Files selected for processing (1)
  • api/internal/usecase/interactor/log_test.go (1 hunks)
🔇 Additional comments (2)
api/internal/usecase/interactor/log_test.go (2)

1-14: All import statements and package definitions look consistent.
No issues identified. The necessary dependencies are properly included, and the setup is straightforward for test functionality.


15-23: Mock gateway implementation looks solid.
The simplified mock returning static logs or errors is sufficient for testing. No concurrency issues or other concerns noted.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
api/internal/usecase/interactor/log_test.go (3)

15-22: Enhance mock implementation.

Consider adding verification capabilities to the mock to ensure it's called with expected parameters.

Apply this diff:

 type mockLogGateway struct {
 	logs []*log.Log
 	err  error
+	calls []struct {
+		since      time.Time
+		workflowID id.WorkflowID
+		jobID      id.JobID
+	}
 }

 func (m *mockLogGateway) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {
+	m.calls = append(m.calls, struct {
+		since      time.Time
+		workflowID id.WorkflowID
+		jobID      id.JobID
+	}{since, workflowID, jobID})
 	return m.logs, m.err
 }

43-58: Extract test data setup to constants.

Consider extracting test data setup to constants or helper functions for better maintainability.

Add these constants at the package level:

+const (
+	testThreshold = time.Hour
+	testMessage1  = "redis log 1"
+	testMessage2  = "redis log 2"
+	testMessage3  = "gcs log 1"
+	testMessage4  = "gcs log 2"
+)
+
+func createTestLogs(workflowID id.WorkflowID, jobID id.JobID, nodeID *log.NodeID, messages ...string) []*log.Log {
+	logs := make([]*log.Log, len(messages))
+	for i, msg := range messages {
+		logs[i] = log.NewLog(workflowID, jobID, nodeID, time.Now(), log.LevelInfo, msg)
+	}
+	return logs
+}

59-101: Add test case for context timeout.

Add a test case to verify that the context timeout is working correctly.

Add this test case:

+	t.Run("context timeout", func(t *testing.T) {
+		slowMock := &mockLogGateway{
+			logs: redisLogs,
+			err:  context.DeadlineExceeded,
+		}
+		li := NewLogInteractor(slowMock, gcsMock, 1*time.Hour)
+
+		ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+		defer cancel()
+		time.Sleep(2 * time.Millisecond) // Ensure timeout
+
+		out, err := li.GetLogs(ctx, time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
+		assert.Nil(t, out)
+		assert.ErrorIs(t, err, context.DeadlineExceeded)
+	})
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e70e885 and d43a6fc.

📒 Files selected for processing (3)
  • api/internal/usecase/interactor/common.go (2 hunks)
  • api/internal/usecase/interactor/log.go (1 hunks)
  • api/internal/usecase/interactor/log_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/internal/usecase/interactor/common.go
🔇 Additional comments (4)
api/internal/usecase/interactor/log.go (3)

15-19: LGTM! Well-structured LogInteractor.

The struct is well-designed with clear separation between Redis and GCS gateways, and includes the configurable threshold as suggested.


21-31: ⚠️ Potential issue

Add validation for nil gateways.

The constructor should validate that neither gateway is nil to prevent runtime panics.

Apply this diff:

-func NewLogInteractor(lgRedis gateway.Log, lgGCS gateway.Log, recentLogsThreshold time.Duration) interfaces.Log {
+func NewLogInteractor(lgRedis gateway.Log, lgGCS gateway.Log, recentLogsThreshold time.Duration) (interfaces.Log, error) {
+	if lgRedis == nil || lgGCS == nil {
+		return nil, fmt.Errorf("invalid gateways: Redis=%v, GCS=%v", lgRedis != nil, lgGCS != nil)
+	}
 	if recentLogsThreshold <= 0 {
 		recentLogsThreshold = 60 * time.Minute
 	}
 	return &LogInteractor{
 		logsGatewayRedis:    lgRedis,
 		logsGatewayGCS:      lgGCS,
 		recentLogsThreshold: recentLogsThreshold,
-	}
+	}, nil
 }

Likely invalid or redundant comment.


33-49: 🛠️ Refactor suggestion

Remove unused operator parameter.

The operator parameter is not used in the implementation and should be removed.

Apply this diff:

-func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID, operator *usecase.Operator) ([]*log.Log, error) {
+func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {

Likely invalid or redundant comment.

api/internal/usecase/interactor/log_test.go (1)

24-41: 🛠️ Refactor suggestion

Add test cases for nil gateway validation.

Once the nil gateway validation is implemented in the constructor, add corresponding test cases.

Add this test case:

 func TestNewLogInteractor(t *testing.T) {
+	t.Run("should return error for nil gateways", func(t *testing.T) {
+		li, err := NewLogInteractor(nil, &mockLogGateway{}, time.Hour)
+		assert.Error(t, err)
+		assert.Nil(t, li)
+
+		li, err = NewLogInteractor(&mockLogGateway{}, nil, time.Hour)
+		assert.Error(t, err)
+		assert.Nil(t, li)
+	})

Likely invalid or redundant comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (11)
api/internal/usecase/gateway/log.go (1)

11-13: Add interface documentation.

Consider adding documentation to describe:

  • The purpose and responsibility of the Log interface
  • The expected behavior of GetLogs method
  • The meaning of since/until parameters (inclusive/exclusive)
+// Log defines the interface for retrieving logs from storage backends.
 type Log interface {
+    // GetLogs retrieves logs within the specified time range for a given workflow and job.
+    // The time range is inclusive of both since and until timestamps.
     GetLogs(ctx context.Context, since time.Time, until time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error)
 }
api/internal/infrastructure/gcs/real_gcs.go (2)

27-45: Consider implementing pagination for large object lists.

The current implementation loads all object names into memory. For buckets with many objects, this could consume significant memory. Consider implementing pagination or streaming for large result sets.


47-59: Add context timeout handling for object reads.

The ReadObject method should respect context cancellation and timeouts. Consider checking ctx.Done() in a select statement when reading large objects.

 func (b *realGCSBucket) ReadObject(ctx context.Context, objectName string) ([]byte, error) {
     r, err := b.bucket.Object(objectName).NewReader(ctx)
     if err != nil {
         return nil, err
     }
     defer r.Close()
 
-    data, err := io.ReadAll(r)
+    // Create a channel for the read operation
+    dataCh := make(chan []byte)
+    errCh := make(chan error)
+    
+    go func() {
+        data, err := io.ReadAll(r)
+        if err != nil {
+            errCh <- err
+            return
+        }
+        dataCh <- data
+    }()
+
+    // Wait for either completion or context cancellation
+    select {
+    case data := <-dataCh:
+        return data, nil
+    case err := <-errCh:
+        return nil, err
+    case <-ctx.Done():
+        return nil, ctx.Err()
+    }
-    if err != nil {
-        return nil, err
-    }
-    return data, nil
 }
api/internal/infrastructure/gcs/log_test.go (2)

53-120: Add concurrent access test case.

The current test suite doesn't verify behavior under concurrent access. Consider adding a test case that simulates multiple goroutines accessing logs simultaneously.


274-295: Test error message format for date range limit.

The test verifies that an error is returned for large date ranges but doesn't validate the exact error message format. Consider adding assertions for a standardized error message format.

api/internal/infrastructure/redis/log_test.go (2)

322-322: Translate Japanese comment to English.

The comment "since と until に一致する2件のみ含まれるはず" should be in English for consistency.

-    assert.Len(t, result, 2, "since と until に一致する2件のみ含まれるはず")
+    assert.Len(t, result, 2, "should only include 2 logs matching since and until timestamps")

331-331: Translate Japanese comment to English.

The comment "until より後なので含まれない" should be in English for consistency.

-    assert.False(t, foundMsgs["after until"], "until より後なので含まれない")
+    assert.False(t, foundMsgs["after until"], "should not include logs after until timestamp")
api/internal/infrastructure/gcs/log.go (2)

150-153: Add context timeout for GCS operations.

GCS operations should have a timeout to prevent hanging in case of network issues or slow responses.

 		// List all objects under the prefix
+		ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+		defer cancel()
 		objectNames, err := bucket.ListObjects(ctx, prefix)
 		if err != nil {
 			return nil, fmt.Errorf("failed to list objects: %w", err)
 		}

156-160: Add context timeout for object read operations.

Similar to listing objects, reading objects should also have a timeout.

+			ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+			defer cancel()
 			data, err := bucket.ReadObject(ctx, objName)
 			if err != nil {
 				reearth_log.Warnfc(ctx, "gcsLog: failed to read object (%s): %v", objName, err)
 				continue
 			}
api/internal/infrastructure/redis/log.go (1)

109-116: Consider using Redis pipelining for better performance.

When fetching multiple keys, using Redis pipelining can significantly improve performance by reducing network round trips.

+		pipe := r.client.Pipeline()
+		cmds := make([]*redis.StringCmd, len(keys))
 		for _, key := range keys {
-			val, err := r.client.Get(ctx, key).Result()
-			if err == redis.Nil {
-				// Skip if key does not exist (deleted)
-				continue
-			} else if err != nil {
-				return nil, fmt.Errorf("failed to get redis value for key=%s: %w", key, err)
-			}
+			cmds = append(cmds, pipe.Get(ctx, key))
+		}
+		
+		_, err = pipe.Exec(ctx)
+		if err != nil && err != redis.Nil {
+			return nil, fmt.Errorf("failed to execute pipeline: %w", err)
+		}
+
+		for i, cmd := range cmds {
+			val, err := cmd.Result()
+			if err == redis.Nil {
+				continue
+			}
api/internal/usecase/interactor/log_test.go (1)

43-101: Add test for concurrent log access.

The current tests don't verify behavior under concurrent access. Consider adding a test case that simulates multiple goroutines accessing logs simultaneously.

+	t.Run("concurrent access", func(t *testing.T) {
+		li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour)
+		var wg sync.WaitGroup
+		const numGoroutines = 10
+
+		for i := 0; i < numGoroutines; i++ {
+			wg.Add(1)
+			go func() {
+				defer wg.Done()
+				since := time.Now()
+				out, err := li.GetLogs(context.Background(), since, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
+				assert.NoError(t, err)
+				assert.NotNil(t, out)
+			}()
+		}
+		wg.Wait()
+	})
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d43a6fc and c4987ee.

⛔ Files ignored due to path filters (2)
  • api/go.sum is excluded by !**/*.sum
  • go.work.sum is excluded by !**/*.sum
📒 Files selected for processing (10)
  • api/go.mod (2 hunks)
  • api/internal/app/repo.go (3 hunks)
  • api/internal/infrastructure/gcs/log.go (1 hunks)
  • api/internal/infrastructure/gcs/log_test.go (1 hunks)
  • api/internal/infrastructure/gcs/real_gcs.go (1 hunks)
  • api/internal/infrastructure/redis/log.go (1 hunks)
  • api/internal/infrastructure/redis/log_test.go (1 hunks)
  • api/internal/usecase/gateway/log.go (1 hunks)
  • api/internal/usecase/interactor/log.go (1 hunks)
  • api/internal/usecase/interactor/log_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/internal/usecase/interactor/log.go
🔇 Additional comments (3)
api/go.mod (1)

20-20: Consider updating go-redis to the latest version.

The current version (v9.7.0) is not the latest. Consider updating to the latest stable version for potential bug fixes and improvements.

Run this script to check for the latest version and any security advisories:

api/internal/app/repo.go (2)

133-135: ⚠️ Potential issue

Replace hardcoded Redis configuration with configuration from config.Config.

The Redis connection string should not be hardcoded and should be configurable.

 	client := redis.NewClient(&redis.Options{
-		Addr: "localhost:6379",
+		Addr:     conf.Redis.Addr,
+		Password: conf.Redis.Password,
+		DB:       conf.Redis.DB,
 	})

Likely invalid or redundant comment.


132-141: 🛠️ Refactor suggestion

Add Redis configuration validation and connection testing.

The Redis client initialization should validate the configuration and test the connection.

 func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log {
+	if conf.Redis == nil || conf.Redis.Addr == "" {
+		log.Warn("Redis configuration not found, skipping Redis log initialization")
+		return nil
+	}
+
 	client := redis.NewClient(&redis.Options{
 		Addr: conf.Redis.Addr,
 	})
+
+	if err := client.Ping(ctx).Err(); err != nil {
+		log.Fatalf("Failed to connect to Redis: %v", err)
+	}
+
 	logRedisRepo, err := redisrepo.NewRedisLog(client)
 	if err != nil {
 		log.Fatalf("Failed to create redis log repository: %v", err)
 	}
 	return logRedisRepo
 }

Likely invalid or redundant comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
api/internal/app/repo.go (1)

146-157: 🛠️ Refactor suggestion

Add configuration validation before GCS initialization.

Add a configuration check before initializing GCS to prevent unnecessary initialization when GCS is not configured.

 func initLogGCS(ctx context.Context, conf *config.Config) gateway.Log {
+	if !conf.GCS.IsConfigured() {
+		return nil
+	}
 	c, err := storage.NewClient(ctx)
 	if err != nil {
 		log.Fatalf("Failed to create gcs client: %v", err)
 	}
 	gcsClient := gcs.NewRealGCSClient(c)
 	logGCSRepo, err := gcs.NewGCSLog(gcsClient, conf.GCS.BucketName)
 	if err != nil {
 		log.Fatalf("Failed to create gcs log repository: %v", err)
 	}
 	return logGCSRepo
 }
🧹 Nitpick comments (1)
api/.env.example (1)

20-23: Add documentation for Redis environment variables.

Add comments explaining the purpose and expected values for each Redis configuration variable.

 # Redis for Realtime Logging
-REEARTH_FLOW_REDIS_ADDR=localhost:6379
-REEARTH_FLOW_REDIS_PASSWORD=
-REEARTH_FLOW_REDIS_DB=
+# Redis server address (default: localhost:6379)
+REEARTH_FLOW_REDIS_ADDR=localhost:6379
+# Redis password (optional)
+REEARTH_FLOW_REDIS_PASSWORD=
+# Redis database number (default: 0)
+REEARTH_FLOW_REDIS_DB=
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c4987ee and ccf340e.

📒 Files selected for processing (4)
  • api/.env.example (1 hunks)
  • api/internal/app/config/config.go (1 hunks)
  • api/internal/app/config/redis.go (1 hunks)
  • api/internal/app/repo.go (3 hunks)
🔇 Additional comments (4)
api/internal/app/config/config.go (1)

58-60: LGTM!

The Redis configuration field is well-placed and follows the existing configuration pattern.

api/internal/app/repo.go (3)

6-7: LGTM!

The import statements are properly organized and include all necessary dependencies.

Also applies to: 14-14


73-75: LGTM!

The log repository initialization is well-integrated into the existing gateway initialization flow.


132-144: 🛠️ Refactor suggestion

Add configuration validation before Redis initialization.

Add a configuration check before initializing Redis to prevent unnecessary initialization when Redis is not configured.

 func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log {
+	if !conf.Redis.IsConfigured() {
+		return nil
+	}
 	opts := &redis.Options{
 		Addr:     conf.Redis.Addr,
 		Password: conf.Redis.Password,
 		DB:       conf.Redis.DB,
 	}
 	client := redis.NewClient(opts)
+	if err := client.Ping(ctx).Err(); err != nil {
+		log.Fatalf("Failed to connect to Redis: %v", err)
+	}
 	logRedisRepo, err := redisrepo.NewRedisLog(client)
 	if err != nil {
 		log.Fatalf("Failed to create redis log repository: %v", err)
 	}
 	return logRedisRepo
 }

Likely invalid or redundant comment.

api/internal/app/config/redis.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (6)
api/.env.example (2)

20-23: Add Redis configuration documentation and connection timeout.

The Redis configuration should include documentation about each setting's purpose and additional configuration options for production deployments.

Add these configurations and documentation:

 # Redis for Realtime Logging
+# REEARTH_FLOW_REDIS_ADDR: Redis server address in host:port format
 REEARTH_FLOW_REDIS_ADDR=localhost:6379
+# REEARTH_FLOW_REDIS_PASSWORD: Redis server password (optional)
 REEARTH_FLOW_REDIS_PASSWORD=
+# REEARTH_FLOW_REDIS_DB: Redis database number
 REEARTH_FLOW_REDIS_DB="0"
+# REEARTH_FLOW_REDIS_TIMEOUT: Connection timeout in seconds
+REEARTH_FLOW_REDIS_TIMEOUT=5

25-27: Add GCS log bucket documentation and region configuration.

The GCS log configuration should include documentation and region settings for compliance requirements.

Add these configurations and documentation:

 # GCS for Realtime Logging
+# REEARTH_FLOW_GCSLOG_BUCKETNAME: GCS bucket name for storing logs
 REEARTH_FLOW_GCSLOG_BUCKETNAME=bucket_name
+# REEARTH_FLOW_GCSLOG_PUBLICATIONCACHECONTROL: Cache control header for log objects
 REEARTH_FLOW_GCSLOG_PUBLICATIONCACHECONTROL=
+# REEARTH_FLOW_GCSLOG_LOCATION: GCS bucket location (e.g., US-EAST1)
+REEARTH_FLOW_GCSLOG_LOCATION=
api/internal/app/repo.go (1)

6-7: Group related imports together.

The Redis and GCS imports should be grouped with other infrastructure imports for better organization.

Reorder imports:

-"cloud.google.com/go/storage"
-"github.com/redis/go-redis/v9"
 "github.com/reearth/reearth-flow/api/internal/app/config"
 "github.com/reearth/reearth-flow/api/internal/infrastructure/auth0"
 "github.com/reearth/reearth-flow/api/internal/infrastructure/fs"
 "github.com/reearth/reearth-flow/api/internal/infrastructure/gcpbatch"
 "github.com/reearth/reearth-flow/api/internal/infrastructure/gcs"
+"cloud.google.com/go/storage"
+"github.com/redis/go-redis/v9"
 mongorepo "github.com/reearth/reearth-flow/api/internal/infrastructure/mongo"
 redisrepo "github.com/reearth/reearth-flow/api/internal/infrastructure/redis"

Also applies to: 14-14

api/internal/usecase/interactor/log_test.go (3)

15-22: Enhance mock implementation for better test coverage.

The current mock is too simple and doesn't validate input parameters. Consider:

  1. Add parameter validation
  2. Track method calls for verification
  3. Add ability to return different results for different inputs
 type mockLogGateway struct {
 	logs []*log.Log
 	err  error
+	// Track calls for verification
+	calls []struct {
+		since     time.Time
+		until     time.Time
+		workflowID id.WorkflowID
+		jobID     id.JobID
+	}
 }
 
 func (m *mockLogGateway) GetLogs(ctx context.Context, since time.Time, until time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {
+	// Track method call
+	m.calls = append(m.calls, struct {
+		since     time.Time
+		until     time.Time
+		workflowID id.WorkflowID
+		jobID     id.JobID
+	}{since, until, workflowID, jobID})
+
+	// Validate input parameters
+	if since.IsZero() {
+		return nil, errors.New("since time is zero")
+	}
+	if until.IsZero() {
+		return nil, errors.New("until time is zero")
+	}
+	if since.After(until) {
+		return nil, errors.New("since time is after until time")
+	}
 	return m.logs, m.err
 }

79-120: Add test cases for missing error scenarios.

Consider adding the following test cases:

  1. Context cancellation handling
  2. Empty log results from both Redis and GCS
  3. Edge cases around the threshold boundary

Example additional test cases:

t.Run("context cancellation", func(t *testing.T) {
    li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour)
    ctx, cancel := context.WithCancel(context.Background())
    cancel()
    
    out, err := li.GetLogs(ctx, time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
    assert.Nil(t, out)
    assert.Error(t, err)
    assert.Contains(t, err.Error(), "context canceled")
})

t.Run("empty logs from redis", func(t *testing.T) {
    emptyRedisMock := &mockLogGateway{logs: []*log.Log{}}
    li := NewLogInteractor(emptyRedisMock, gcsMock, 1*time.Hour)
    
    out, err := li.GetLogs(context.Background(), time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
    assert.NoError(t, err)
    assert.Empty(t, out)
})

t.Run("exactly at threshold boundary", func(t *testing.T) {
    li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour)
    since := time.Now().Add(-1 * time.Hour)
    
    out, err := li.GetLogs(context.Background(), since, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
    assert.NoError(t, err)
    // Verify which gateway was used at exactly the threshold
})

104-104: Translate comments to English for consistency.

Replace Japanese comments with English for better maintainability:

-		since := time.Now().Add(-30 * time.Minute) // Redis 側にアクセスするケース
+		since := time.Now().Add(-30 * time.Minute) // Case: Accessing Redis gateway
-		since := time.Now().Add(-2 * time.Hour) // GCS 側にアクセスするケース
+		since := time.Now().Add(-2 * time.Hour) // Case: Accessing GCS gateway

Also applies to: 114-114

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ccf340e and 0f49a03.

📒 Files selected for processing (6)
  • api/.env.example (1 hunks)
  • api/internal/app/config/config.go (1 hunks)
  • api/internal/app/config/log.go (1 hunks)
  • api/internal/app/repo.go (3 hunks)
  • api/internal/usecase/interactor/log.go (1 hunks)
  • api/internal/usecase/interactor/log_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • api/internal/app/config/config.go
  • api/internal/usecase/interactor/log.go
🔇 Additional comments (1)
api/internal/app/repo.go (1)

73-75: LGTM! Good separation of concerns.

The initialization of log repositories is cleanly separated into dedicated functions.

api/internal/app/repo.go Outdated Show resolved Hide resolved
api/internal/app/repo.go Outdated Show resolved Hide resolved
api/internal/app/repo.go Show resolved Hide resolved
api/internal/app/config/log.go Outdated Show resolved Hide resolved
api/internal/app/config/log.go Outdated Show resolved Hide resolved
api/internal/usecase/interactor/log_test.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
api/internal/app/config/log.go (1)

26-33: ⚠️ Potential issue

Enhance GCS configuration validation.

The current implementation only checks if the bucket name is non-empty. Consider adding comprehensive validation.

 type GCSLogConfig struct {
+	// BucketName is the name of the GCS bucket where logs will be stored
+	// Format: my-bucket-name (3-63 characters, lowercase, numbers, hyphens)
 	BucketName              string `pp:",omitempty"`
+	// PublicationCacheControl defines cache behavior for published logs
+	// Format: public, max-age=3600
 	PublicationCacheControl string `pp:",omitempty"`
 }

 func (g GCSLogConfig) IsConfigured() bool {
-	return g.BucketName != ""
+	if g.BucketName == "" {
+		return false
+	}
+	// Basic GCS bucket name validation
+	// Must be 3-63 characters, lowercase, numbers, hyphens
+	if !regexp.MustCompile(`^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$`).MatchString(g.BucketName) {
+		return false
+	}
+	if g.PublicationCacheControl != "" {
+		// Validate cache control format
+		validDirectives := []string{"public", "private", "no-cache", "max-age="}
+		isValid := false
+		for _, directive := range validDirectives {
+			if strings.Contains(g.PublicationCacheControl, directive) {
+				isValid = true
+				break
+			}
+		}
+		if !isValid {
+			return false
+		}
+	}
+	return true
 }
api/internal/app/repo.go (2)

132-148: ⚠️ Potential issue

Add Redis connection health check and cleanup.

The Redis client initialization should include a connection health check and proper cleanup.

 func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log {
 	if conf.RedisLog.IsConfigured() {
 		log.Infofc(ctx, "log: redis storage is used: %s\n", conf.RedisLog.Addr)
 		opts := &redis.Options{
 			Addr:     conf.RedisLog.Addr,
 			Password: conf.RedisLog.Password,
 			DB:       conf.RedisLog.DB,
+			ConnTimeout: time.Second * 5,
 		}
 		client := redis.NewClient(opts)
+		
+		// Check connection health
+		if err := client.Ping(ctx).Err(); err != nil {
+			log.Warnf("log: redis connection failed: %s\n", err.Error())
+			return nil
+		}
+		
+		// Register cleanup on context done
+		go func() {
+			<-ctx.Done()
+			if err := client.Close(); err != nil {
+				log.Warnf("log: failed to close redis connection: %s\n", err.Error())
+			}
+		}()
 		
 		logRedisRepo, err := redisrepo.NewRedisLog(client)
 		if err != nil {
 			log.Warnf("log: failed to init redis storage: %s\n", err.Error())
+			if err := client.Close(); err != nil {
+				log.Warnf("log: failed to close redis connection: %s\n", err.Error())
+			}
 		}
 		return logRedisRepo
 	}
 	return nil
 }

150-165: ⚠️ Potential issue

Add GCS client cleanup and retry configuration.

The GCS client initialization should include proper cleanup and retry configuration for better reliability.

 func initLogGCS(ctx context.Context, conf *config.Config) gateway.Log {
 	if conf.GCSLog.IsConfigured() {
 		log.Infofc(ctx, "log: GCS storage is used: %s\n", conf.GCSLog.BucketName)
-		c, err := storage.NewClient(ctx)
+		c, err := storage.NewClient(ctx, 
+			storage.WithRetry(
+				storage.WithBackoff(gax.Backoff{
+					Initial:    time.Second,
+					Max:        time.Second * 10,
+					Multiplier: 2,
+				}),
+				storage.WithPolicy(storage.RetryIdempotent),
+			),
+		)
 		if err != nil {
 			log.Warnf("log: failed to init GCS storage: %s\n", err.Error())
+			return nil
 		}
+		
+		// Register cleanup on context done
+		go func() {
+			<-ctx.Done()
+			if err := c.Close(); err != nil {
+				log.Warnf("log: failed to close gcs client: %s\n", err.Error())
+			}
+		}()
 		
 		gcsClient := gcs.NewRealGCSClient(c)
 		logGCSRepo, err := gcs.NewGCSLog(gcsClient, conf.GCSLog.BucketName)
 		if err != nil {
 			log.Warnf("log: failed to init GCS storage: %s\n", err.Error())
+			if err := c.Close(); err != nil {
+				log.Warnf("log: failed to close gcs client: %s\n", err.Error())
+			}
+			return nil
 		}
 		return logGCSRepo
 	}
 	return nil
 }
🧹 Nitpick comments (1)
api/internal/app/config/log.go (1)

5-24: Add field documentation and enhance password validation.

The Redis configuration implementation looks good, but could benefit from additional documentation and validation.

 type RedisLogConfig struct {
+	// Addr is the Redis server address in format host:port
+	// Example: localhost:6379 or redis://localhost:6379
 	Addr     string `pp:",omitempty"`
+	// Password is the optional Redis server password
 	Password string `pp:",omitempty"`
+	// DB is the Redis database number (0-15)
 	DB       int    `pp:",omitempty"`
 }

 func (r RedisLogConfig) IsConfigured() bool {
 	if r.Addr == "" {
 		return false
 	}
 	if r.DB < 0 || r.DB > 15 {
 		return false
 	}
+	// Validate password if provided
+	if r.Password != "" && len(r.Password) < 8 {
+		return false
+	}
 	// Basic format validation for Redis address
 	// Example: localhost:6379 or redis://localhost:6379
 	if !strings.Contains(r.Addr, ":") {
 		return false
 	}
 	return true
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0f49a03 and d85eaee.

📒 Files selected for processing (2)
  • api/internal/app/config/log.go (1 hunks)
  • api/internal/app/repo.go (3 hunks)
🔇 Additional comments (1)
api/internal/app/repo.go (1)

6-7: Consider error handling for log initialization failures.

While the log initialization looks good, consider handling the case where both Redis and GCS log initialization fail.

 	// Log
 	gateways.LogRedis = initLogRedis(ctx, conf)
 	gateways.LogGCS = initLogGCS(ctx, conf)
+	if gateways.LogRedis == nil && gateways.LogGCS == nil {
+		log.Warnf("log: both Redis and GCS log initialization failed")
+	}

Also applies to: 73-75

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
api/internal/adapter/gql/gqlmodel/models_gen.go (1)

219-226: Consider implementing the Node interface for consistency.

For consistency with other entities in the codebase (e.g., Asset, Deployment, Job), consider implementing the Node interface for the Log struct:

 type Log struct {
+	ID         ID        `json:"id"`
 	WorkflowID ID        `json:"workflowId"`
 	JobID      ID        `json:"jobId"`
 	NodeID     *ID       `json:"nodeId,omitempty"`
 	Timestamp  time.Time `json:"timestamp"`
 	LogLevel   LogLevel  `json:"logLevel"`
 	Message    string    `json:"message"`
 }
+
+func (Log) IsNode()        {}
+func (this Log) GetID() ID { return this.ID }
api/internal/infrastructure/gcs/real_gcs.go (2)

12-22: Add godoc comments for better documentation.

While the implementation is correct, adding godoc comments would improve code documentation and help other developers understand the purpose and usage of these types and methods.

Add documentation comments like this:

+// realGCSClient implements GCSClient interface using Google Cloud Storage.
 type realGCSClient struct {
 	client *storage.Client
 }

+// NewRealGCSClient creates a new GCSClient with the provided storage.Client.
 func NewRealGCSClient(client *storage.Client) GCSClient {
 	return &realGCSClient{client: client}
 }

+// Bucket returns a GCSBucket interface for the specified bucket name.
 func (r *realGCSClient) Bucket(name string) GCSBucket {
 	return &realGCSBucket{bucket: r.client.Bucket(name)}
 }

28-46: Consider performance optimizations and context handling.

While the implementation is functionally correct, consider these improvements:

  1. Pre-allocate the slice to improve performance when dealing with many objects
  2. Consider adding a context timeout to prevent long-running operations

Here's a suggested implementation:

 func (b *realGCSBucket) ListObjects(ctx context.Context, prefix string) ([]string, error) {
-	var names []string
+	names := make([]string, 0, 100) // Pre-allocate with a reasonable capacity
+
+	// Add timeout if not already set in context
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
 
 	it := b.bucket.Objects(ctx, &storage.Query{
 		Prefix: prefix,
 	})
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d85eaee and 39e5c8b.

📒 Files selected for processing (5)
  • api/internal/adapter/gql/generated.go (14 hunks)
  • api/internal/adapter/gql/gqlmodel/models_gen.go (2 hunks)
  • api/internal/adapter/gql/resolver_query.go (2 hunks)
  • api/internal/infrastructure/gcs/real_gcs.go (1 hunks)
  • api/internal/infrastructure/redis/log_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • api/internal/adapter/gql/resolver_query.go
  • api/internal/infrastructure/redis/log_test.go
🔇 Additional comments (5)
api/internal/adapter/gql/gqlmodel/models_gen.go (2)

613-658: LGTM! Well-structured log level implementation.

The LogLevel type implementation:

  • Follows a consistent pattern with other enum types
  • Uses standard log severity levels
  • Includes proper GraphQL marshaling support

Line range hint 1-1: Verify GraphQL schema changes.

Since this is a generated file, ensure that the corresponding changes in the GraphQL schema (api/gql/log.graphql) properly define:

  1. The Log type with all fields
  2. The LogLevel enum with all values
  3. Any queries or mutations that use these types
✅ Verification successful

GraphQL schema changes are properly defined ✓

The schema in api/gql/log.graphql correctly defines all the necessary types and fields that match the generated code, including:

  • Complete LogLevel enum
  • Well-structured Log type with all required fields
  • Properly defined logs query with required parameters
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the GraphQL schema properly defines the Log types
cat api/gql/log.graphql

Length of output: 289

api/internal/adapter/gql/generated.go (2)

2399-2418: Well-structured GraphQL schema for logging system!

The Log type and LogLevel enum are well-defined with:

  • Comprehensive log levels (ERROR to TRACE)
  • Required fields marked as non-null
  • Proper use of DateTime scalar for timestamp
  • Clear and consistent naming

20467-20509: Excellent performance optimization using concurrent marshaling!

The implementation efficiently handles array marshaling by:

  • Using goroutines for parallel processing when array length > 1
  • Optimizing single-element arrays by avoiding goroutine overhead
  • Proper synchronization using WaitGroup
  • Thorough error handling with panic recovery
api/internal/infrastructure/gcs/real_gcs.go (1)

1-10: LGTM! Well-structured package with appropriate imports.

The package structure is clean and imports are properly organized with the required dependencies.

api/internal/infrastructure/gcs/real_gcs.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
api/internal/usecase/interactor/log_test.go (3)

15-22: Enhance mock implementation for better testing.

Consider improving the mock implementation by:

  1. Adding method call tracking to verify behavior
  2. Implementing compile-time interface validation
+var _ usecase.LogGateway = (*mockLogGateway)(nil)  // Compile-time interface validation

 type mockLogGateway struct {
 	logs []*log.Log
 	err  error
+	calls []struct {
+		since time.Time
+		until time.Time
+		workflowID id.WorkflowID
+		jobID id.JobID
+	}
 }

 func (m *mockLogGateway) GetLogs(ctx context.Context, since time.Time, until time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {
+	m.calls = append(m.calls, struct {
+		since time.Time
+		until time.Time
+		workflowID id.WorkflowID
+		jobID id.JobID
+	}{since, until, workflowID, jobID})
 	return m.logs, m.err
 }

24-41: Add missing test cases for constructor edge cases.

Consider adding the following test cases:

  1. Constructor behavior with nil gateways
  2. Zero threshold value handling
 func TestNewLogInteractor(t *testing.T) {
     // ... existing tests ...

+    t.Run("nil gateways should still construct", func(t *testing.T) {
+        li := NewLogInteractor(nil, nil, 10*time.Minute)
+        assert.NotNil(t, li)
+    })
+
+    t.Run("zero threshold should default to 60 minutes", func(t *testing.T) {
+        redisMock := &mockLogGateway{}
+        gcsMock := &mockLogGateway{}
+        li := NewLogInteractor(redisMock, gcsMock, 0)
+        logi := li.(*LogInteractor)
+        assert.Equal(t, 60*time.Minute, logi.recentLogsThreshold)
+    })
 }

43-121: Add test cases for missing scenarios.

Consider adding the following test cases:

  1. Validation of the 'until' parameter
  2. Context cancellation handling
  3. Edge case where 'since' equals the threshold boundary
 func TestLogInteractor_GetLogs(t *testing.T) {
     // ... existing setup ...

+    t.Run("respect until parameter", func(t *testing.T) {
+        li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour)
+        now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
+        since := now.Add(-30 * time.Minute)
+        until := now.Add(-15 * time.Minute)
+        out, err := li.GetLogs(context.Background(), since, until, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
+        assert.NoError(t, err)
+        assert.Equal(t, redisLogs, out)
+    })
+
+    t.Run("handle context cancellation", func(t *testing.T) {
+        li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour)
+        ctx, cancel := context.WithCancel(context.Background())
+        cancel()
+        out, err := li.GetLogs(ctx, time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
+        assert.Error(t, err)
+        assert.True(t, errors.Is(err, context.Canceled))
+    })
+
+    t.Run("exactly at threshold boundary", func(t *testing.T) {
+        li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour)
+        now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
+        since := now.Add(-1 * time.Hour)
+        out, err := li.GetLogs(context.Background(), since, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{})
+        assert.NoError(t, err)
+        // Verify which gateway was used
+        assert.Equal(t, redisLogs, out)
+    })
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 901c635 and bcabdc3.

📒 Files selected for processing (6)
  • api/go.mod (2 hunks)
  • api/internal/app/config/log.go (1 hunks)
  • api/internal/infrastructure/gcs/log.go (1 hunks)
  • api/internal/infrastructure/redis/log.go (1 hunks)
  • api/internal/usecase/interactor/common.go (2 hunks)
  • api/internal/usecase/interactor/log_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • api/internal/usecase/interactor/common.go
  • api/go.mod
  • api/internal/app/config/log.go
  • api/internal/infrastructure/redis/log.go
  • api/internal/infrastructure/gcs/log.go
🔇 Additional comments (1)
api/internal/usecase/interactor/log_test.go (1)

43-54: Make tests deterministic by using fixed timestamps.

The current implementation uses time.Now() which can make tests flaky.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
api/internal/app/repo.go (1)

130-146: 🛠️ Refactor suggestion

Add Redis connection health check and cleanup.

The Redis client initialization should include:

  1. Connection health check using Ping
  2. Connection timeout configuration
  3. Proper cleanup when context is done
🧹 Nitpick comments (2)
api/internal/app/config/log.go (2)

5-9: Add field documentation and consider password security.

The struct fields should be documented to explain their purpose and requirements. Also, consider adding password validation.

 type RedisLogConfig struct {
+	// Addr is the Redis server address in format host:port
+	// Example: localhost:6379 or redis://localhost:6379
 	Addr     string `pp:",omitempty"`
+	// Password is the Redis server authentication password (optional)
+	// For security, ensure this is stored securely and not in plain text
 	Password string `pp:",omitempty"`
+	// DB is the Redis database number (0-15)
 	DB       int    `pp:",omitempty"`
 }

11-22: Enhance Redis address validation.

While the current validation is good, consider adding more robust address validation using regex to ensure proper format.

+import "regexp"
+
 func (r RedisLogConfig) IsConfigured() bool {
 	if r.Addr == "" {
 		return false
 	}
 	if r.DB < 0 || r.DB > 15 {
 		return false
 	}
-	if !strings.Contains(r.Addr, ":") {
+	// Validate Redis address format: host:port or redis://host:port
+	redisAddrPattern := `^(redis://)?([a-zA-Z0-9.-]+):(\d{1,5})$`
+	if !regexp.MustCompile(redisAddrPattern).MatchString(r.Addr) {
 		return false
 	}
 	return true
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bcabdc3 and 53bc817.

📒 Files selected for processing (8)
  • api/.env.example (1 hunks)
  • api/internal/app/config/config.go (1 hunks)
  • api/internal/app/config/log.go (1 hunks)
  • api/internal/app/repo.go (3 hunks)
  • api/internal/usecase/gateway/container.go (1 hunks)
  • api/internal/usecase/interactor/common.go (1 hunks)
  • api/internal/usecase/interactor/log.go (1 hunks)
  • api/internal/usecase/interactor/log_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • api/internal/usecase/gateway/container.go
  • api/internal/app/config/config.go
  • api/.env.example
  • api/internal/usecase/interactor/common.go
  • api/internal/usecase/interactor/log_test.go
🔇 Additional comments (3)
api/internal/app/config/log.go (1)

1-4: LGTM!

Package name and imports are appropriate for the functionality.

api/internal/usecase/interactor/log.go (1)

16-18: 🛠️ Refactor suggestion

Consider adding GCS gateway field.

The struct only has Redis gateway but based on the PR objectives, GCS integration is also planned. Consider adding the GCS gateway field.

 type LogInteractor struct {
 	logsGatewayRedis gateway.Log
+	logsGatewayGCS   gateway.Log
 }

Likely invalid or redundant comment.

api/internal/app/repo.go (1)

72-73: Consider initializing GCS log gateway.

Based on the PR objectives, GCS integration is planned but not initialized here.

api/internal/usecase/interactor/log.go Show resolved Hide resolved
api/internal/usecase/interactor/log.go Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
api/internal/adapter/gql/generated.go (1)

148-148: Consider adding pagination to the logs query.

The logs query might return a large number of results, which could impact performance. Consider implementing pagination similar to how it's done for the jobs query.

Apply this diff to add pagination to the schema:

-  logs(since: DateTime!, jobId: ID!): [Log]
+  logs(since: DateTime!, jobId: ID!, pagination: Pagination): LogConnection!

+type LogConnection {
+  edges: [LogEdge!]!
+  pageInfo: PageInfo!
+}

+type LogEdge {
+  node: Log!
+  cursor: String!
+}

Also applies to: 171-177, 280-280

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a6ea63c and 78c75b5.

📒 Files selected for processing (2)
  • api/gql/job.graphql (1 hunks)
  • api/internal/adapter/gql/generated.go (24 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/gql/job.graphql
🔇 Additional comments (4)
api/internal/adapter/gql/generated.go (4)

2404-2423: LGTM! Well-structured GraphQL schema for logs.

The Log type and LogLevel enum are well-designed with appropriate field requirements and standard log levels.


11076-11138: LGTM! Well-implemented query resolvers with proper error handling.

The logs query resolver and field contexts are well-implemented with comprehensive error handling and middleware integration.


17579-17633: LGTM! Robust Log type implementation with proper null handling.

The Log type implementation includes proper null handling for required fields and comprehensive error recovery.


21631-21677: LGTM! Efficient marshaling implementation with proper concurrency handling.

The marshaling implementation efficiently handles arrays with proper concurrency and error recovery.

@soma00333 soma00333 changed the title feat(api): Add Logs() to extract realtime logging feat(api): Add real-time logging functionality using Redis Feb 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants