-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(api): Add real-time logging functionality using Redis #780
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request introduces a comprehensive logging system for the ReEarth Flow application, leveraging Redis for log storage and retrieval. The changes span multiple packages and include GraphQL schema modifications, infrastructure implementations, and test coverage. The new logging functionality allows retrieving logs for specific workflows and jobs, with support for different log levels (ERROR, WARN, INFO, DEBUG, TRACE) and flexible querying capabilities. Changes
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
✅ Deploy Preview for reearth-flow canceled.
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #780 +/- ##
==========================================
- Coverage 25.42% 23.04% -2.38%
==========================================
Files 137 143 +6
Lines 7713 7893 +180
==========================================
- Hits 1961 1819 -142
- Misses 5558 5920 +362
+ Partials 194 154 -40
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 16
🧹 Nitpick comments (11)
api/internal/adapter/gql/generated.go (2)
2336-2356
: DefiningLog
type andlogs
query in GraphQL schemaThe
Log
type, along with theLogLevel
enum, is well-defined in the GraphQL schema. Thelogs
query is properly added, acceptingsince
,workflowId
, andjobId
as non-nullable arguments, and returning a non-nullable list ofLog
objects.Consider making
jobId
argument optional in thelogs
queryTo enhance flexibility, consider making the
jobId
argument optional. This would allow users to retrieve logs for an entire workflow without specifying a specific job, which could be useful in scenarios where logs need to be analyzed at the workflow level.
2355-2355
: Add pagination parameters tologs
queryTo handle large volumes of logs and improve performance, consider adding pagination parameters (
first
,last
,after
,before
) to thelogs
query. This will help manage the size of the data returned and provide better control to the clients consuming the API.api/internal/usecase/gateway/container.go (1)
7-8
: Consider renaming fields for better clarity and separation of concerns.The current field names
LogRedis
andLogGCS
tightly couple the interface to specific implementations. Consider more abstract names likePrimaryLogger
andSecondaryLogger
orShortTermLogger
andLongTermLogger
to better reflect their roles rather than their implementations.api/internal/usecase/gateway/log.go (1)
11-13
: Consider enhancing the Log interface for better scalability and clarity.
- The interface could benefit from pagination support to handle large volumes of logs efficiently.
- The
time.Time
parameter's purpose is ambiguous - consider adding a comment or renaming it to clarify if it's a start time, end time, or exact timestamp.- Consider adding filtering options (e.g., log levels, time ranges) for more flexible log retrieval.
Example enhancement:
type LogFilter struct { Since time.Time Until time.Time LogLevels []LogLevel Pagination *PaginationOptions } type Log interface { GetLogs(ctx context.Context, filter LogFilter, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, *PaginationResult, error) }api/internal/adapter/gql/loader_log.go (2)
20-32
: Add error wrapping for better error context.The error handling could be improved by wrapping errors with additional context about the failure point.
func (l *LogLoader) GetLogs(ctx context.Context, since time.Time, workflowID gqlmodel.ID, jobID gqlmodel.ID) ([]*gqlmodel.Log, error) { newJobID, err := id.JobIDFrom(string(jobID)) if err != nil { - return nil, err + return nil, fmt.Errorf("invalid job ID: %w", err) } newWorkflowID, err := id.WorkflowIDFrom(string(workflowID)) if err != nil { - return nil, err + return nil, fmt.Errorf("invalid workflow ID: %w", err) } res, err := l.usecase.GetLogs(ctx, since, newWorkflowID, newJobID, getOperator(ctx)) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get logs: %w", err) }
34-38
: Consider using slice preallocation for better performance.The slice preallocation is good, but consider returning early for empty results to avoid unnecessary allocations.
+ if len(res) == 0 { + return nil, nil + } logs := make([]*gqlmodel.Log, 0, len(res)) for _, log := range res { logs = append(logs, gqlmodel.ToLog(log)) } return logs, nilapi/pkg/log/log.go (1)
15-22
: Consider adding validation methods for required fields.The Log struct should have methods to validate required fields and handle empty messages.
+func (l *Log) Validate() error { + if l.workflowID == "" || l.jobID == "" { + return fmt.Errorf("workflowID and jobID are required") + } + if l.message == "" { + return fmt.Errorf("log message cannot be empty") + } + return nil +}api/internal/app/repo.go (1)
73-75
: Consider conditional initialization of log backends.Both Redis and GCS log backends are initialized unconditionally. This could lead to unnecessary resource usage if not all backends are needed.
Consider initializing only the configured backends and storing them in a slice:
- gateways.LogRedis = initLogRedis(ctx, conf) - gateways.LogGCS = initLogGCS(ctx, conf) + var logBackends []gateway.Log + if redisLog := initLogRedis(ctx, conf); redisLog != nil { + logBackends = append(logBackends, redisLog) + } + if gcsLog := initLogGCS(ctx, conf); gcsLog != nil { + logBackends = append(logBackends, gcsLog) + } + gateways.Logs = logBackendsapi/internal/adapter/gql/resolver_query.go (1)
53-55
: Consider adding pagination and time range validation.The
Logs
query might return large result sets and could benefit from pagination. Additionally, consider validating the time range to prevent excessive queries.Consider these improvements:
- Add pagination support:
-func (r *queryResolver) Logs(ctx context.Context, since time.Time, workflowId gqlmodel.ID, jobId gqlmodel.ID) ([]*gqlmodel.Log, error) { +func (r *queryResolver) Logs(ctx context.Context, since time.Time, workflowId gqlmodel.ID, jobId gqlmodel.ID, pagination *gqlmodel.Pagination) (*gqlmodel.LogConnection, error) {
- Add time range validation:
func (r *queryResolver) Logs(ctx context.Context, since time.Time, workflowId gqlmodel.ID, jobId gqlmodel.ID) ([]*gqlmodel.Log, error) { + maxRange := 24 * time.Hour + if time.Since(since) > maxRange { + return nil, fmt.Errorf("time range cannot exceed %v", maxRange) + } return loaders(ctx).Log.GetLogs(ctx, since, workflowId, jobId) }api/gql/log.graphql (2)
1-7
: Add documentation for the LogLevel enum.Consider adding descriptions for the enum and its values to improve API documentation.
Apply this diff:
+""" +Represents the severity level of a log entry. +""" enum LogLevel { - ERROR - WARN - INFO - DEBUG - TRACE + """System is unusable or a critical error occurred.""" + ERROR + """Warning messages for potentially harmful situations.""" + WARN + """General informational messages about system operation.""" + INFO + """Detailed messages useful for debugging.""" + DEBUG + """Very detailed messages for tracing program execution.""" + TRACE }
18-20
: Consider making workflowId and jobId optional.Making these parameters optional would allow querying logs across all workflows or jobs within a time range.
Apply this diff:
extend type Query { - logs(since: DateTime!, workflowId: ID!, jobId: ID!): [Log!]! + """ + Retrieve logs within a time range, optionally filtered by workflow and job. + """ + logs( + """Start time for the log query.""" + since: DateTime!, + """Filter logs by workflow ID.""" + workflowId: ID, + """Filter logs by job ID.""" + jobId: ID + ): [Log!]! }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
api/go.sum
is excluded by!**/*.sum
go.work.sum
is excluded by!**/*.sum
📒 Files selected for processing (19)
api/go.mod
(2 hunks)api/gql/log.graphql
(1 hunks)api/internal/adapter/gql/generated.go
(16 hunks)api/internal/adapter/gql/gqlmodel/convert_log.go
(1 hunks)api/internal/adapter/gql/gqlmodel/models_gen.go
(3 hunks)api/internal/adapter/gql/loader.go
(2 hunks)api/internal/adapter/gql/loader_log.go
(1 hunks)api/internal/adapter/gql/resolver_query.go
(2 hunks)api/internal/app/repo.go
(3 hunks)api/internal/infrastructure/gcs/log.go
(1 hunks)api/internal/infrastructure/redis/log.go
(1 hunks)api/internal/usecase/gateway/container.go
(1 hunks)api/internal/usecase/gateway/log.go
(1 hunks)api/internal/usecase/interactor/common.go
(1 hunks)api/internal/usecase/interactor/log.go
(1 hunks)api/internal/usecase/interfaces/common.go
(1 hunks)api/internal/usecase/interfaces/log.go
(1 hunks)api/pkg/log/id.go
(1 hunks)api/pkg/log/log.go
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- api/pkg/log/id.go
🔇 Additional comments (14)
api/internal/adapter/gql/generated.go (9)
170-177
: Addition ofLog
struct inComplexityRoot
The
Log
struct has been correctly added toComplexityRoot
to define complexity functions for each field in theLog
type. This ensures that complexity analysis for theLog
fields is properly handled.
280-280
: Addition ofLogs
function inComplexityRoot.Query
The
Logs
function is appropriately added to theQuery
struct withinComplexityRoot
to calculate the complexity of thelogs
query, accommodating the new parameterssince
,workflowID
, andjobID
.
421-421
: AddingLogs
method toQueryResolver
interfaceThe
Logs
method is correctly added to theQueryResolver
interface, allowing for the retrieval of logs based on the specified parameters. This enhances the GraphQL API's capability to fetch logs.
847-888
: Implementing complexity estimations forLog
fieldsComplexity functions for each field in the
Log
type have been properly implemented in the switch cases. This ensures accurate complexity calculations during query execution, which is essential for optimizing GraphQL performance.
1591-1602
: Handling arguments inQuery.logs
complexity functionThe
field_Query_logs_args
function is correctly utilized to parse and validate the arguments for thelogs
query within the complexity estimation. Proper error handling is in place to ensure robustness.
3370-3402
: Correct argument handling infield_Query_logs_args
The
field_Query_logs_args
function correctly parses the required arguments for thelogs
query. However, ensure that proper validation is in place for argument types and values to prevent potential runtime errors.
Line range hint
6434-11013
: Implementing resolver methods forLog
fieldsThe resolver methods for the
Log
type fields are correctly implemented, ensuring appropriate data retrieval and error handling. This allows for each field of theLog
type to be properly resolved when queried.
17071-17131
: Marshaling theLog
typeThe marshaling function for the
Log
type is properly implemented to serializeLog
objects in GraphQL responses. This ensures thatLog
data is accurately represented in the API output.
19895-19958
: Marshaling and unmarshaling ofLogLevel
andLog
typesThe marshal and unmarshal functions for both the
LogLevel
enum and theLog
type are correctly implemented. This provides necessary serialization and deserialization support for these types within the GraphQL API.api/internal/usecase/interfaces/log.go (1)
12-14
: Document the relationship between interfaces.Log and gateway.Log.
- The relationship between
interfaces.Log
andgateway.Log
is unclear. Consider adding package documentation explaining:
- Why two similar interfaces exist
- When to use each interface
- The role of the
Operator
parameter- Consider using interface embedding if
interfaces.Log
is meant to extendgateway.Log
.Let's check for any other similar interfaces in the codebase:
✅ Verification successful
Document the clean architecture pattern implemented by Log interfaces
The relationship between
interfaces.Log
andgateway.Log
follows the clean architecture pattern:
gateway.Log
: Infrastructure layer interface for storage implementations (Redis, GCS)interfaces.Log
: Application layer interface adding authorization capability viaOperator
LogInteractor
: Implementation that combines Redis (recent logs) and GCS (older logs)Consider adding package documentation explaining this architecture and the currently unused
Operator
parameter's intended purpose for authorization.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for Log interface definitions rg "type\s+Log\s+interface" -A 3Length of output: 491
Script:
#!/bin/bash # Search for implementations and usages echo "=== Searching for GetLogs implementations ===" ast-grep --pattern 'func $_($_, $_, $_, $_) ([]*log.Log, error)' echo -e "\n=== Searching for interface usages ===" rg "gateway\.Log|interfaces\.Log" -B 2 -A 2 echo -e "\n=== Checking for package documentation ===" fd "^log.go$" --exec head -n 10 {}Length of output: 3480
Script:
#!/bin/bash # Check LogInteractor implementation echo "=== LogInteractor Implementation ===" cat api/internal/usecase/interactor/log.goLength of output: 1069
api/internal/usecase/interfaces/common.go (1)
28-28
: LGTM! Clean addition of Log fieldThe Log field is properly integrated into the Container struct, maintaining consistency with the existing structure.
api/internal/adapter/gql/gqlmodel/models_gen.go (1)
218-225
: LGTM!The implementation of the
Log
struct andLogLevel
type is well-structured and follows standard patterns. The GQL marshaling/unmarshaling implementation is correct.Also applies to: 597-642
api/go.mod (2)
44-45
: LGTM! Indirect dependencies look good.The indirect dependencies
github.com/cespare/xxhash/v2
andgithub.com/dgryski/go-rendezvous
are legitimate Redis client dependencies with appropriate versions.
20-20
:⚠️ Potential issueFix invalid Redis client version.
The specified version
v9.7.0
ofgithub.com/redis/go-redis/v9
does not exist in the official releases. The latest stable version isv9.5.1
.Apply this diff to fix the version:
- github.com/redis/go-redis/v9 v9.7.0 + github.com/redis/go-redis/v9 v9.5.1Run the following script to verify the Redis client version and check for known vulnerabilities:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
api/internal/infrastructure/gcs/log.go (1)
26-26
: Enhance TODO comment with implementation details.The TODO comment should outline the implementation steps for better clarity.
- // TODO: Implement + // TODO: Implement GCS log retrieval + // 1. Use client.Bucket to get bucket handle + // 2. List objects with prefix matching workflowID/jobID + // 3. Filter objects modified after 'since' timestamp + // 4. Read and parse log entries from matching objects + // 5. Return parsed logsapi/internal/infrastructure/redis/log.go (1)
27-27
: Enhance TODO comment with implementation details.The TODO comment should outline the implementation steps for better clarity.
- // TODO: Implement + // TODO: Implement Redis log retrieval + // 1. Construct Redis key pattern using workflowID/jobID + // 2. Use ZRANGEBYSCORE to get logs after 'since' timestamp + // 3. Deserialize and parse log entries + // 4. Return parsed logs
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
api/internal/app/repo.go
(3 hunks)api/internal/infrastructure/gcs/log.go
(1 hunks)api/internal/infrastructure/redis/log.go
(1 hunks)
🔇 Additional comments (5)
api/internal/infrastructure/gcs/log.go (1)
25-34
:⚠️ Potential issueFix GetLogs implementation to use provided parameters.
The current implementation generates new IDs instead of using the provided
workflowID
andjobID
parameters. This will make it impossible to retrieve logs for specific workflows and jobs.Apply this diff to fix the implementation:
func (g *gcsLog) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) { // TODO: Implement nodeID := log.NodeID(id.NewNodeID()) dummyLogs := []*log.Log{ - log.NewLog(id.NewWorkflowID(), id.NewJobID(), nil, log.LevelInfo, "Test log message 1 from gcs"), - log.NewLog(id.NewWorkflowID(), id.NewJobID(), &nodeID, log.LevelDebug, "Test log message 2 from gcs"), + log.NewLog(workflowID, jobID, nil, log.LevelInfo, "Test log message 1 from gcs"), + log.NewLog(workflowID, jobID, &nodeID, log.LevelDebug, "Test log message 2 from gcs"), } return dummyLogs, nil }Likely invalid or redundant comment.
api/internal/infrastructure/redis/log.go (1)
26-35
:⚠️ Potential issueFix receiver name and GetLogs implementation.
- The receiver variable 'g' should be 'r' to match the redisLog type
- The implementation generates new IDs instead of using the provided parameters
Apply this diff to fix the implementation:
-func (g *redisLog) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) { +func (r *redisLog) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) { // TODO: Implement nodeID := log.NodeID(id.NewNodeID()) dummyLogs := []*log.Log{ - log.NewLog(id.NewWorkflowID(), id.NewJobID(), nil, log.LevelInfo, "Test log message 1 from redis"), - log.NewLog(id.NewWorkflowID(), id.NewJobID(), &nodeID, log.LevelDebug, "Test log message 2 from redis"), + log.NewLog(workflowID, jobID, nil, log.LevelInfo, "Test log message 1 from redis"), + log.NewLog(workflowID, jobID, &nodeID, log.LevelDebug, "Test log message 2 from redis"), } return dummyLogs, nil }Likely invalid or redundant comment.
api/internal/app/repo.go (3)
132-141
: 🛠️ Refactor suggestionAdd Redis connection validation.
The Redis client creation should validate the connection by performing a PING command.
func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log { + if !conf.Redis.IsConfigured() { + return nil + } client := redis.NewClient(&redis.Options{ Addr: conf.Redis.Addr, + Password: conf.Redis.Password, + DB: conf.Redis.DB, }) + if err := client.Ping(ctx).Err(); err != nil { + log.Fatalf("Failed to connect to Redis: %v", err) + } logRedisRepo, err := redisrepo.NewRedisLog(client) if err != nil { log.Fatalf("Failed to create redis log repository: %v", err) } return logRedisRepo }Likely invalid or redundant comment.
133-135
:⚠️ Potential issueReplace hardcoded Redis configuration with values from config.
The Redis connection string is hardcoded to "localhost:6379". This should be configurable through the application's configuration system.
client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", + Addr: conf.Redis.Addr, + Password: conf.Redis.Password, + DB: conf.Redis.DB, })Likely invalid or redundant comment.
143-153
: 🛠️ Refactor suggestionAdd GCS configuration validation.
The GCS client initialization should check if GCS logging is enabled in the configuration.
func initLogGCS(ctx context.Context, conf *config.Config) gateway.Log { + if !conf.GCS.IsConfigured() { + return nil + } gcsClient, err := storage.NewClient(ctx) if err != nil { log.Fatalf("Failed to create gcs client: %v", err) } - logGCSRepo, err := gcs.NewGCSLog(gcsClient) + logGCSRepo, err := gcs.NewGCSLog(gcsClient, conf.GCS.LogBucket) if err != nil { log.Fatalf("Failed to create gcs log repository: %v", err) } return logGCSRepo }Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
api/internal/usecase/interactor/log.go (2)
25-27
: Define default threshold as a package constant.The default threshold of 60 minutes should be defined as a package constant for better maintainability and reusability.
+const defaultRecentLogsThreshold = 60 * time.Minute func NewLogInteractor(lgRedis gateway.Log, lgGCS gateway.Log, recentLogsThreshold time.Duration) interfaces.Log { // ... if recentLogsThreshold <= 0 { - recentLogsThreshold = 60 * time.Minute + recentLogsThreshold = defaultRecentLogsThreshold }
37-39
: Define query timeout as a package constant.The 30-second timeout duration should be defined as a package constant for better maintainability.
+const defaultQueryTimeout = 30 * time.Second func (li *LogInteractor) GetLogs(...) ([]*log.Log, error) { // Add timeout to prevent long-running queries - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(ctx, defaultQueryTimeout)
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
api/internal/infrastructure/redis/log.go
(1 hunks)api/internal/usecase/interactor/common.go
(2 hunks)api/internal/usecase/interactor/log.go
(1 hunks)api/pkg/log/log.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- api/internal/usecase/interactor/common.go
- api/internal/infrastructure/redis/log.go
- api/pkg/log/log.go
🧰 Additional context used
🪛 GitHub Actions: ci
api/internal/usecase/interactor/log.go
[error] 23-23: Panic: Log gateways cannot be nil
🔇 Additional comments (3)
api/internal/usecase/interactor/log.go (3)
15-19
: LGTM! Well-structured struct definition.The LogInteractor struct is well-designed with clear field names and appropriate types. The recentLogsThreshold field has been added as requested.
40-51
: LGTM! Well-implemented log retrieval logic.The implementation correctly:
- Uses the configured threshold to choose between Redis and GCS
- Properly handles errors with descriptive messages
- Returns early on Redis success
36-36
: 🛠️ Refactor suggestionRemove unused operator parameter.
The operator parameter is not used in the method implementation and should be removed.
-func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID, operator *usecase.Operator) ([]*log.Log, error) { +func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
api/pkg/log/log.go (2)
7-13
: 🛠️ Refactor suggestionAdd an
IsValid
method to validate log levels.This suggestion duplicates a past comment about preventing invalid log levels. Please consider implementing an
IsValid
check in theLevel
type to ensure correctness and consistency throughout the system.func (l Level) IsValid() bool { switch l { case LevelError, LevelWarn, LevelInfo, LevelDebug, LevelTrace: return true default: return false } }
24-33
: 🛠️ Refactor suggestionValidate log level in the constructor.
This is also a duplicate request from a previous review to ensure that invalid log levels default to
LevelInfo
. Consider updating the constructor with validation logic to avoid confusion or errors down the line.func NewLog(workflowID WorkflowID, jobID JobID, nodeID *NodeID, time time.Time, level Level, message string) *Log { + if !level.IsValid() { + level = LevelInfo + } return &Log{ workflowID: workflowID, jobID: jobID, nodeID: nodeID, timestamp: time, level: level, message: message, } }
🧹 Nitpick comments (6)
api/internal/adapter/gql/loader_log_test.go (4)
17-19
: Add a brief comment explaining the purpose of this mock.
Consider adding a short docstring or comment to help clarify that this mock is used to test theLogUsecase
methods without interfacing with actual external resources.
21-24
: Safeguard against potential type assertion issues.
When retrievingargs.Get(0)
as[]*log.Log
, consider checking for successful type conversion to avoid unexpected panics if the mock isn't set up correctly.
26-45
: Verify log content in addition to log count.
Currently, the test only checks that the returned array has the same length as the mocked data. Consider verifying the actual log messages or levels to ensure full correctness.
47-61
: Verify the specific error message.
While you assert an error occurs, you might also confirm that the returned error message matches “usecase error.” This approach offers stronger validation of the error-handling path.api/internal/usecase/interactor/common.go (1)
31-31
: Make log retention duration configurable.The 60-minute duration is hardcoded. Consider making it configurable through
ContainerConfig
for better flexibility.Apply this diff to make the duration configurable:
type ContainerConfig struct { SignupSecret string AuthSrvUIDomain string + LogRetention time.Duration } func NewContainer(...) interfaces.Container { job := NewJob(r, g) - log, err := NewLogInteractor(g.LogRedis, g.LogGCS, 60*time.Minute) + logRetention := config.LogRetention + if logRetention == 0 { + logRetention = 60 * time.Minute // default value + } + log, err := NewLogInteractor(g.LogRedis, g.LogGCS, logRetention)api/internal/adapter/gql/gqlmodel/convert_log_test.go (1)
17-32
: Consider using a fixed timestamp for test reproducibility.Currently, the test uses
time.Now()
, which can cause slight variations in the timestamp. Using a fixed timestamp would improve test consistency and reduce flakiness.- d := log.NewLog(wfid, jid, &nid, time.Now(), log.LevelInfo, "message") + fixedTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + d := log.NewLog(wfid, jid, &nid, fixedTime, log.LevelInfo, "message")
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
api/internal/adapter/gql/gqlmodel/convert_log_test.go
(1 hunks)api/internal/adapter/gql/loader_log_test.go
(1 hunks)api/internal/infrastructure/gcs/log.go
(1 hunks)api/internal/infrastructure/redis/log.go
(1 hunks)api/internal/usecase/interactor/common.go
(2 hunks)api/internal/usecase/interactor/log.go
(1 hunks)api/pkg/log/log.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- api/internal/infrastructure/redis/log.go
- api/internal/infrastructure/gcs/log.go
- api/internal/usecase/interactor/log.go
🔇 Additional comments (3)
api/internal/usecase/interactor/common.go (2)
5-5
: LGTM!The
time
package import is correctly added and necessary for the log duration configuration.
31-34
: Backend failure handling needs improvement.The LogInteractor initialization lacks error handling and fallback mechanisms for backend failures.
api/internal/adapter/gql/gqlmodel/convert_log_test.go (1)
12-16
: Great coverage of nil input case.Testing the
nil
input scenario ensures robust handling of edge cases. Having a dedicated test for it is a good practice and prevents potential runtime errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
api/pkg/log/log_test.go (2)
10-38
: Well-structured test for log creation.The overall logic for verifying each field of the newly created log instance looks clear and comprehensive. However, consider the implications of using
time.Now()
directly in tests, as minor clock skews or prolonged test runs might introduce a delay. Although unlikely, you could improve test stability by using a fixedtime.Time
value (e.g. viatime.Date(...)
) for extra consistency.
40-61
: Recommend adding a test case for nil NodeID.Since the code passes a pointer for NodeID, it might be valuable to verify behavior when the NodeID is intentionally set to nil, if supported by the application logic. This can help ensure stability for logs that don’t specify a node.
+func TestLogWithNilNodeID(t *testing.T) { + workflowID := id.NewWorkflowID() + jobID := id.NewJobID() + // Passing nil for nodeID + log := NewLog(workflowID, jobID, nil, time.Now(), LevelDebug, "Nil node test") + + if log.NodeID() != nil { + t.Errorf("Expected NodeID to be nil, got %v", log.NodeID()) + } +}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
api/internal/usecase/interactor/log_test.go (2)
24-56
: Extensive test coverage for “NewLogInteractor” instantiation.
The sub-tests correctly verify success cases, error handling, and default threshold assignment. As a small improvement, consider usingrequire.NoError
instead ofassert.NoError
in scenarios where subsequent checks depend on the absence of an error.
58-120
: Comprehensive test coverage of log retrieval logic.
All edge cases—redis usage, GCS usage, and error scenarios—are handled thoroughly. If the codebase grows, consider a table-driven testing approach to streamline the repeated patterns in these sub-tests.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
api/internal/usecase/interactor/log_test.go
(1 hunks)
🔇 Additional comments (2)
api/internal/usecase/interactor/log_test.go (2)
1-14
: All import statements and package definitions look consistent.
No issues identified. The necessary dependencies are properly included, and the setup is straightforward for test functionality.
15-23
: Mock gateway implementation looks solid.
The simplified mock returning static logs or errors is sufficient for testing. No concurrency issues or other concerns noted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
api/internal/usecase/interactor/log_test.go (3)
15-22
: Enhance mock implementation.Consider adding verification capabilities to the mock to ensure it's called with expected parameters.
Apply this diff:
type mockLogGateway struct { logs []*log.Log err error + calls []struct { + since time.Time + workflowID id.WorkflowID + jobID id.JobID + } } func (m *mockLogGateway) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) { + m.calls = append(m.calls, struct { + since time.Time + workflowID id.WorkflowID + jobID id.JobID + }{since, workflowID, jobID}) return m.logs, m.err }
43-58
: Extract test data setup to constants.Consider extracting test data setup to constants or helper functions for better maintainability.
Add these constants at the package level:
+const ( + testThreshold = time.Hour + testMessage1 = "redis log 1" + testMessage2 = "redis log 2" + testMessage3 = "gcs log 1" + testMessage4 = "gcs log 2" +) + +func createTestLogs(workflowID id.WorkflowID, jobID id.JobID, nodeID *log.NodeID, messages ...string) []*log.Log { + logs := make([]*log.Log, len(messages)) + for i, msg := range messages { + logs[i] = log.NewLog(workflowID, jobID, nodeID, time.Now(), log.LevelInfo, msg) + } + return logs +}
59-101
: Add test case for context timeout.Add a test case to verify that the context timeout is working correctly.
Add this test case:
+ t.Run("context timeout", func(t *testing.T) { + slowMock := &mockLogGateway{ + logs: redisLogs, + err: context.DeadlineExceeded, + } + li := NewLogInteractor(slowMock, gcsMock, 1*time.Hour) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + time.Sleep(2 * time.Millisecond) // Ensure timeout + + out, err := li.GetLogs(ctx, time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) + assert.Nil(t, out) + assert.ErrorIs(t, err, context.DeadlineExceeded) + })
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
api/internal/usecase/interactor/common.go
(2 hunks)api/internal/usecase/interactor/log.go
(1 hunks)api/internal/usecase/interactor/log_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- api/internal/usecase/interactor/common.go
🔇 Additional comments (4)
api/internal/usecase/interactor/log.go (3)
15-19
: LGTM! Well-structured LogInteractor.The struct is well-designed with clear separation between Redis and GCS gateways, and includes the configurable threshold as suggested.
21-31
:⚠️ Potential issueAdd validation for nil gateways.
The constructor should validate that neither gateway is nil to prevent runtime panics.
Apply this diff:
-func NewLogInteractor(lgRedis gateway.Log, lgGCS gateway.Log, recentLogsThreshold time.Duration) interfaces.Log { +func NewLogInteractor(lgRedis gateway.Log, lgGCS gateway.Log, recentLogsThreshold time.Duration) (interfaces.Log, error) { + if lgRedis == nil || lgGCS == nil { + return nil, fmt.Errorf("invalid gateways: Redis=%v, GCS=%v", lgRedis != nil, lgGCS != nil) + } if recentLogsThreshold <= 0 { recentLogsThreshold = 60 * time.Minute } return &LogInteractor{ logsGatewayRedis: lgRedis, logsGatewayGCS: lgGCS, recentLogsThreshold: recentLogsThreshold, - } + }, nil }Likely invalid or redundant comment.
33-49
: 🛠️ Refactor suggestionRemove unused operator parameter.
The operator parameter is not used in the implementation and should be removed.
Apply this diff:
-func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID, operator *usecase.Operator) ([]*log.Log, error) { +func (li *LogInteractor) GetLogs(ctx context.Context, since time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) {Likely invalid or redundant comment.
api/internal/usecase/interactor/log_test.go (1)
24-41
: 🛠️ Refactor suggestionAdd test cases for nil gateway validation.
Once the nil gateway validation is implemented in the constructor, add corresponding test cases.
Add this test case:
func TestNewLogInteractor(t *testing.T) { + t.Run("should return error for nil gateways", func(t *testing.T) { + li, err := NewLogInteractor(nil, &mockLogGateway{}, time.Hour) + assert.Error(t, err) + assert.Nil(t, li) + + li, err = NewLogInteractor(&mockLogGateway{}, nil, time.Hour) + assert.Error(t, err) + assert.Nil(t, li) + })Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (11)
api/internal/usecase/gateway/log.go (1)
11-13
: Add interface documentation.Consider adding documentation to describe:
- The purpose and responsibility of the Log interface
- The expected behavior of GetLogs method
- The meaning of since/until parameters (inclusive/exclusive)
+// Log defines the interface for retrieving logs from storage backends. type Log interface { + // GetLogs retrieves logs within the specified time range for a given workflow and job. + // The time range is inclusive of both since and until timestamps. GetLogs(ctx context.Context, since time.Time, until time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) }api/internal/infrastructure/gcs/real_gcs.go (2)
27-45
: Consider implementing pagination for large object lists.The current implementation loads all object names into memory. For buckets with many objects, this could consume significant memory. Consider implementing pagination or streaming for large result sets.
47-59
: Add context timeout handling for object reads.The ReadObject method should respect context cancellation and timeouts. Consider checking ctx.Done() in a select statement when reading large objects.
func (b *realGCSBucket) ReadObject(ctx context.Context, objectName string) ([]byte, error) { r, err := b.bucket.Object(objectName).NewReader(ctx) if err != nil { return nil, err } defer r.Close() - data, err := io.ReadAll(r) + // Create a channel for the read operation + dataCh := make(chan []byte) + errCh := make(chan error) + + go func() { + data, err := io.ReadAll(r) + if err != nil { + errCh <- err + return + } + dataCh <- data + }() + + // Wait for either completion or context cancellation + select { + case data := <-dataCh: + return data, nil + case err := <-errCh: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + } - if err != nil { - return nil, err - } - return data, nil }api/internal/infrastructure/gcs/log_test.go (2)
53-120
: Add concurrent access test case.The current test suite doesn't verify behavior under concurrent access. Consider adding a test case that simulates multiple goroutines accessing logs simultaneously.
274-295
: Test error message format for date range limit.The test verifies that an error is returned for large date ranges but doesn't validate the exact error message format. Consider adding assertions for a standardized error message format.
api/internal/infrastructure/redis/log_test.go (2)
322-322
: Translate Japanese comment to English.The comment "since と until に一致する2件のみ含まれるはず" should be in English for consistency.
- assert.Len(t, result, 2, "since と until に一致する2件のみ含まれるはず") + assert.Len(t, result, 2, "should only include 2 logs matching since and until timestamps")
331-331
: Translate Japanese comment to English.The comment "until より後なので含まれない" should be in English for consistency.
- assert.False(t, foundMsgs["after until"], "until より後なので含まれない") + assert.False(t, foundMsgs["after until"], "should not include logs after until timestamp")api/internal/infrastructure/gcs/log.go (2)
150-153
: Add context timeout for GCS operations.GCS operations should have a timeout to prevent hanging in case of network issues or slow responses.
// List all objects under the prefix + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() objectNames, err := bucket.ListObjects(ctx, prefix) if err != nil { return nil, fmt.Errorf("failed to list objects: %w", err) }
156-160
: Add context timeout for object read operations.Similar to listing objects, reading objects should also have a timeout.
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() data, err := bucket.ReadObject(ctx, objName) if err != nil { reearth_log.Warnfc(ctx, "gcsLog: failed to read object (%s): %v", objName, err) continue }api/internal/infrastructure/redis/log.go (1)
109-116
: Consider using Redis pipelining for better performance.When fetching multiple keys, using Redis pipelining can significantly improve performance by reducing network round trips.
+ pipe := r.client.Pipeline() + cmds := make([]*redis.StringCmd, len(keys)) for _, key := range keys { - val, err := r.client.Get(ctx, key).Result() - if err == redis.Nil { - // Skip if key does not exist (deleted) - continue - } else if err != nil { - return nil, fmt.Errorf("failed to get redis value for key=%s: %w", key, err) - } + cmds = append(cmds, pipe.Get(ctx, key)) + } + + _, err = pipe.Exec(ctx) + if err != nil && err != redis.Nil { + return nil, fmt.Errorf("failed to execute pipeline: %w", err) + } + + for i, cmd := range cmds { + val, err := cmd.Result() + if err == redis.Nil { + continue + }api/internal/usecase/interactor/log_test.go (1)
43-101
: Add test for concurrent log access.The current tests don't verify behavior under concurrent access. Consider adding a test case that simulates multiple goroutines accessing logs simultaneously.
+ t.Run("concurrent access", func(t *testing.T) { + li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour) + var wg sync.WaitGroup + const numGoroutines = 10 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + since := time.Now() + out, err := li.GetLogs(context.Background(), since, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) + assert.NoError(t, err) + assert.NotNil(t, out) + }() + } + wg.Wait() + })
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
api/go.sum
is excluded by!**/*.sum
go.work.sum
is excluded by!**/*.sum
📒 Files selected for processing (10)
api/go.mod
(2 hunks)api/internal/app/repo.go
(3 hunks)api/internal/infrastructure/gcs/log.go
(1 hunks)api/internal/infrastructure/gcs/log_test.go
(1 hunks)api/internal/infrastructure/gcs/real_gcs.go
(1 hunks)api/internal/infrastructure/redis/log.go
(1 hunks)api/internal/infrastructure/redis/log_test.go
(1 hunks)api/internal/usecase/gateway/log.go
(1 hunks)api/internal/usecase/interactor/log.go
(1 hunks)api/internal/usecase/interactor/log_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- api/internal/usecase/interactor/log.go
🔇 Additional comments (3)
api/go.mod (1)
20-20
: Consider updating go-redis to the latest version.The current version (v9.7.0) is not the latest. Consider updating to the latest stable version for potential bug fixes and improvements.
Run this script to check for the latest version and any security advisories:
api/internal/app/repo.go (2)
133-135
:⚠️ Potential issueReplace hardcoded Redis configuration with configuration from config.Config.
The Redis connection string should not be hardcoded and should be configurable.
client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", + Addr: conf.Redis.Addr, + Password: conf.Redis.Password, + DB: conf.Redis.DB, })Likely invalid or redundant comment.
132-141
: 🛠️ Refactor suggestionAdd Redis configuration validation and connection testing.
The Redis client initialization should validate the configuration and test the connection.
func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log { + if conf.Redis == nil || conf.Redis.Addr == "" { + log.Warn("Redis configuration not found, skipping Redis log initialization") + return nil + } + client := redis.NewClient(&redis.Options{ Addr: conf.Redis.Addr, }) + + if err := client.Ping(ctx).Err(); err != nil { + log.Fatalf("Failed to connect to Redis: %v", err) + } + logRedisRepo, err := redisrepo.NewRedisLog(client) if err != nil { log.Fatalf("Failed to create redis log repository: %v", err) } return logRedisRepo }Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
api/internal/app/repo.go (1)
146-157
: 🛠️ Refactor suggestionAdd configuration validation before GCS initialization.
Add a configuration check before initializing GCS to prevent unnecessary initialization when GCS is not configured.
func initLogGCS(ctx context.Context, conf *config.Config) gateway.Log { + if !conf.GCS.IsConfigured() { + return nil + } c, err := storage.NewClient(ctx) if err != nil { log.Fatalf("Failed to create gcs client: %v", err) } gcsClient := gcs.NewRealGCSClient(c) logGCSRepo, err := gcs.NewGCSLog(gcsClient, conf.GCS.BucketName) if err != nil { log.Fatalf("Failed to create gcs log repository: %v", err) } return logGCSRepo }
🧹 Nitpick comments (1)
api/.env.example (1)
20-23
: Add documentation for Redis environment variables.Add comments explaining the purpose and expected values for each Redis configuration variable.
# Redis for Realtime Logging -REEARTH_FLOW_REDIS_ADDR=localhost:6379 -REEARTH_FLOW_REDIS_PASSWORD= -REEARTH_FLOW_REDIS_DB= +# Redis server address (default: localhost:6379) +REEARTH_FLOW_REDIS_ADDR=localhost:6379 +# Redis password (optional) +REEARTH_FLOW_REDIS_PASSWORD= +# Redis database number (default: 0) +REEARTH_FLOW_REDIS_DB=
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
api/.env.example
(1 hunks)api/internal/app/config/config.go
(1 hunks)api/internal/app/config/redis.go
(1 hunks)api/internal/app/repo.go
(3 hunks)
🔇 Additional comments (4)
api/internal/app/config/config.go (1)
58-60
: LGTM!The Redis configuration field is well-placed and follows the existing configuration pattern.
api/internal/app/repo.go (3)
6-7
: LGTM!The import statements are properly organized and include all necessary dependencies.
Also applies to: 14-14
73-75
: LGTM!The log repository initialization is well-integrated into the existing gateway initialization flow.
132-144
: 🛠️ Refactor suggestionAdd configuration validation before Redis initialization.
Add a configuration check before initializing Redis to prevent unnecessary initialization when Redis is not configured.
func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log { + if !conf.Redis.IsConfigured() { + return nil + } opts := &redis.Options{ Addr: conf.Redis.Addr, Password: conf.Redis.Password, DB: conf.Redis.DB, } client := redis.NewClient(opts) + if err := client.Ping(ctx).Err(); err != nil { + log.Fatalf("Failed to connect to Redis: %v", err) + } logRedisRepo, err := redisrepo.NewRedisLog(client) if err != nil { log.Fatalf("Failed to create redis log repository: %v", err) } return logRedisRepo }Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (6)
api/.env.example (2)
20-23
: Add Redis configuration documentation and connection timeout.The Redis configuration should include documentation about each setting's purpose and additional configuration options for production deployments.
Add these configurations and documentation:
# Redis for Realtime Logging +# REEARTH_FLOW_REDIS_ADDR: Redis server address in host:port format REEARTH_FLOW_REDIS_ADDR=localhost:6379 +# REEARTH_FLOW_REDIS_PASSWORD: Redis server password (optional) REEARTH_FLOW_REDIS_PASSWORD= +# REEARTH_FLOW_REDIS_DB: Redis database number REEARTH_FLOW_REDIS_DB="0" +# REEARTH_FLOW_REDIS_TIMEOUT: Connection timeout in seconds +REEARTH_FLOW_REDIS_TIMEOUT=5
25-27
: Add GCS log bucket documentation and region configuration.The GCS log configuration should include documentation and region settings for compliance requirements.
Add these configurations and documentation:
# GCS for Realtime Logging +# REEARTH_FLOW_GCSLOG_BUCKETNAME: GCS bucket name for storing logs REEARTH_FLOW_GCSLOG_BUCKETNAME=bucket_name +# REEARTH_FLOW_GCSLOG_PUBLICATIONCACHECONTROL: Cache control header for log objects REEARTH_FLOW_GCSLOG_PUBLICATIONCACHECONTROL= +# REEARTH_FLOW_GCSLOG_LOCATION: GCS bucket location (e.g., US-EAST1) +REEARTH_FLOW_GCSLOG_LOCATION=api/internal/app/repo.go (1)
6-7
: Group related imports together.The Redis and GCS imports should be grouped with other infrastructure imports for better organization.
Reorder imports:
-"cloud.google.com/go/storage" -"github.com/redis/go-redis/v9" "github.com/reearth/reearth-flow/api/internal/app/config" "github.com/reearth/reearth-flow/api/internal/infrastructure/auth0" "github.com/reearth/reearth-flow/api/internal/infrastructure/fs" "github.com/reearth/reearth-flow/api/internal/infrastructure/gcpbatch" "github.com/reearth/reearth-flow/api/internal/infrastructure/gcs" +"cloud.google.com/go/storage" +"github.com/redis/go-redis/v9" mongorepo "github.com/reearth/reearth-flow/api/internal/infrastructure/mongo" redisrepo "github.com/reearth/reearth-flow/api/internal/infrastructure/redis"Also applies to: 14-14
api/internal/usecase/interactor/log_test.go (3)
15-22
: Enhance mock implementation for better test coverage.The current mock is too simple and doesn't validate input parameters. Consider:
- Add parameter validation
- Track method calls for verification
- Add ability to return different results for different inputs
type mockLogGateway struct { logs []*log.Log err error + // Track calls for verification + calls []struct { + since time.Time + until time.Time + workflowID id.WorkflowID + jobID id.JobID + } } func (m *mockLogGateway) GetLogs(ctx context.Context, since time.Time, until time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) { + // Track method call + m.calls = append(m.calls, struct { + since time.Time + until time.Time + workflowID id.WorkflowID + jobID id.JobID + }{since, until, workflowID, jobID}) + + // Validate input parameters + if since.IsZero() { + return nil, errors.New("since time is zero") + } + if until.IsZero() { + return nil, errors.New("until time is zero") + } + if since.After(until) { + return nil, errors.New("since time is after until time") + } return m.logs, m.err }
79-120
: Add test cases for missing error scenarios.Consider adding the following test cases:
- Context cancellation handling
- Empty log results from both Redis and GCS
- Edge cases around the threshold boundary
Example additional test cases:
t.Run("context cancellation", func(t *testing.T) { li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour) ctx, cancel := context.WithCancel(context.Background()) cancel() out, err := li.GetLogs(ctx, time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) assert.Nil(t, out) assert.Error(t, err) assert.Contains(t, err.Error(), "context canceled") }) t.Run("empty logs from redis", func(t *testing.T) { emptyRedisMock := &mockLogGateway{logs: []*log.Log{}} li := NewLogInteractor(emptyRedisMock, gcsMock, 1*time.Hour) out, err := li.GetLogs(context.Background(), time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) assert.NoError(t, err) assert.Empty(t, out) }) t.Run("exactly at threshold boundary", func(t *testing.T) { li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour) since := time.Now().Add(-1 * time.Hour) out, err := li.GetLogs(context.Background(), since, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) assert.NoError(t, err) // Verify which gateway was used at exactly the threshold })
104-104
: Translate comments to English for consistency.Replace Japanese comments with English for better maintainability:
- since := time.Now().Add(-30 * time.Minute) // Redis 側にアクセスするケース + since := time.Now().Add(-30 * time.Minute) // Case: Accessing Redis gateway - since := time.Now().Add(-2 * time.Hour) // GCS 側にアクセスするケース + since := time.Now().Add(-2 * time.Hour) // Case: Accessing GCS gatewayAlso applies to: 114-114
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
api/.env.example
(1 hunks)api/internal/app/config/config.go
(1 hunks)api/internal/app/config/log.go
(1 hunks)api/internal/app/repo.go
(3 hunks)api/internal/usecase/interactor/log.go
(1 hunks)api/internal/usecase/interactor/log_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- api/internal/app/config/config.go
- api/internal/usecase/interactor/log.go
🔇 Additional comments (1)
api/internal/app/repo.go (1)
73-75
: LGTM! Good separation of concerns.The initialization of log repositories is cleanly separated into dedicated functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
api/internal/app/config/log.go (1)
26-33
:⚠️ Potential issueEnhance GCS configuration validation.
The current implementation only checks if the bucket name is non-empty. Consider adding comprehensive validation.
type GCSLogConfig struct { + // BucketName is the name of the GCS bucket where logs will be stored + // Format: my-bucket-name (3-63 characters, lowercase, numbers, hyphens) BucketName string `pp:",omitempty"` + // PublicationCacheControl defines cache behavior for published logs + // Format: public, max-age=3600 PublicationCacheControl string `pp:",omitempty"` } func (g GCSLogConfig) IsConfigured() bool { - return g.BucketName != "" + if g.BucketName == "" { + return false + } + // Basic GCS bucket name validation + // Must be 3-63 characters, lowercase, numbers, hyphens + if !regexp.MustCompile(`^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$`).MatchString(g.BucketName) { + return false + } + if g.PublicationCacheControl != "" { + // Validate cache control format + validDirectives := []string{"public", "private", "no-cache", "max-age="} + isValid := false + for _, directive := range validDirectives { + if strings.Contains(g.PublicationCacheControl, directive) { + isValid = true + break + } + } + if !isValid { + return false + } + } + return true }api/internal/app/repo.go (2)
132-148
:⚠️ Potential issueAdd Redis connection health check and cleanup.
The Redis client initialization should include a connection health check and proper cleanup.
func initLogRedis(ctx context.Context, conf *config.Config) gateway.Log { if conf.RedisLog.IsConfigured() { log.Infofc(ctx, "log: redis storage is used: %s\n", conf.RedisLog.Addr) opts := &redis.Options{ Addr: conf.RedisLog.Addr, Password: conf.RedisLog.Password, DB: conf.RedisLog.DB, + ConnTimeout: time.Second * 5, } client := redis.NewClient(opts) + + // Check connection health + if err := client.Ping(ctx).Err(); err != nil { + log.Warnf("log: redis connection failed: %s\n", err.Error()) + return nil + } + + // Register cleanup on context done + go func() { + <-ctx.Done() + if err := client.Close(); err != nil { + log.Warnf("log: failed to close redis connection: %s\n", err.Error()) + } + }() logRedisRepo, err := redisrepo.NewRedisLog(client) if err != nil { log.Warnf("log: failed to init redis storage: %s\n", err.Error()) + if err := client.Close(); err != nil { + log.Warnf("log: failed to close redis connection: %s\n", err.Error()) + } } return logRedisRepo } return nil }
150-165
:⚠️ Potential issueAdd GCS client cleanup and retry configuration.
The GCS client initialization should include proper cleanup and retry configuration for better reliability.
func initLogGCS(ctx context.Context, conf *config.Config) gateway.Log { if conf.GCSLog.IsConfigured() { log.Infofc(ctx, "log: GCS storage is used: %s\n", conf.GCSLog.BucketName) - c, err := storage.NewClient(ctx) + c, err := storage.NewClient(ctx, + storage.WithRetry( + storage.WithBackoff(gax.Backoff{ + Initial: time.Second, + Max: time.Second * 10, + Multiplier: 2, + }), + storage.WithPolicy(storage.RetryIdempotent), + ), + ) if err != nil { log.Warnf("log: failed to init GCS storage: %s\n", err.Error()) + return nil } + + // Register cleanup on context done + go func() { + <-ctx.Done() + if err := c.Close(); err != nil { + log.Warnf("log: failed to close gcs client: %s\n", err.Error()) + } + }() gcsClient := gcs.NewRealGCSClient(c) logGCSRepo, err := gcs.NewGCSLog(gcsClient, conf.GCSLog.BucketName) if err != nil { log.Warnf("log: failed to init GCS storage: %s\n", err.Error()) + if err := c.Close(); err != nil { + log.Warnf("log: failed to close gcs client: %s\n", err.Error()) + } + return nil } return logGCSRepo } return nil }
🧹 Nitpick comments (1)
api/internal/app/config/log.go (1)
5-24
: Add field documentation and enhance password validation.The Redis configuration implementation looks good, but could benefit from additional documentation and validation.
type RedisLogConfig struct { + // Addr is the Redis server address in format host:port + // Example: localhost:6379 or redis://localhost:6379 Addr string `pp:",omitempty"` + // Password is the optional Redis server password Password string `pp:",omitempty"` + // DB is the Redis database number (0-15) DB int `pp:",omitempty"` } func (r RedisLogConfig) IsConfigured() bool { if r.Addr == "" { return false } if r.DB < 0 || r.DB > 15 { return false } + // Validate password if provided + if r.Password != "" && len(r.Password) < 8 { + return false + } // Basic format validation for Redis address // Example: localhost:6379 or redis://localhost:6379 if !strings.Contains(r.Addr, ":") { return false } return true }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
api/internal/app/config/log.go
(1 hunks)api/internal/app/repo.go
(3 hunks)
🔇 Additional comments (1)
api/internal/app/repo.go (1)
6-7
: Consider error handling for log initialization failures.While the log initialization looks good, consider handling the case where both Redis and GCS log initialization fail.
// Log gateways.LogRedis = initLogRedis(ctx, conf) gateways.LogGCS = initLogGCS(ctx, conf) + if gateways.LogRedis == nil && gateways.LogGCS == nil { + log.Warnf("log: both Redis and GCS log initialization failed") + }Also applies to: 73-75
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
api/internal/adapter/gql/gqlmodel/models_gen.go (1)
219-226
: Consider implementing theNode
interface for consistency.For consistency with other entities in the codebase (e.g.,
Asset
,Deployment
,Job
), consider implementing theNode
interface for theLog
struct:type Log struct { + ID ID `json:"id"` WorkflowID ID `json:"workflowId"` JobID ID `json:"jobId"` NodeID *ID `json:"nodeId,omitempty"` Timestamp time.Time `json:"timestamp"` LogLevel LogLevel `json:"logLevel"` Message string `json:"message"` } + +func (Log) IsNode() {} +func (this Log) GetID() ID { return this.ID }api/internal/infrastructure/gcs/real_gcs.go (2)
12-22
: Add godoc comments for better documentation.While the implementation is correct, adding godoc comments would improve code documentation and help other developers understand the purpose and usage of these types and methods.
Add documentation comments like this:
+// realGCSClient implements GCSClient interface using Google Cloud Storage. type realGCSClient struct { client *storage.Client } +// NewRealGCSClient creates a new GCSClient with the provided storage.Client. func NewRealGCSClient(client *storage.Client) GCSClient { return &realGCSClient{client: client} } +// Bucket returns a GCSBucket interface for the specified bucket name. func (r *realGCSClient) Bucket(name string) GCSBucket { return &realGCSBucket{bucket: r.client.Bucket(name)} }
28-46
: Consider performance optimizations and context handling.While the implementation is functionally correct, consider these improvements:
- Pre-allocate the slice to improve performance when dealing with many objects
- Consider adding a context timeout to prevent long-running operations
Here's a suggested implementation:
func (b *realGCSBucket) ListObjects(ctx context.Context, prefix string) ([]string, error) { - var names []string + names := make([]string, 0, 100) // Pre-allocate with a reasonable capacity + + // Add timeout if not already set in context + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() it := b.bucket.Objects(ctx, &storage.Query{ Prefix: prefix, })
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
api/internal/adapter/gql/generated.go
(14 hunks)api/internal/adapter/gql/gqlmodel/models_gen.go
(2 hunks)api/internal/adapter/gql/resolver_query.go
(2 hunks)api/internal/infrastructure/gcs/real_gcs.go
(1 hunks)api/internal/infrastructure/redis/log_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- api/internal/adapter/gql/resolver_query.go
- api/internal/infrastructure/redis/log_test.go
🔇 Additional comments (5)
api/internal/adapter/gql/gqlmodel/models_gen.go (2)
613-658
: LGTM! Well-structured log level implementation.The
LogLevel
type implementation:
- Follows a consistent pattern with other enum types
- Uses standard log severity levels
- Includes proper GraphQL marshaling support
Line range hint
1-1
: Verify GraphQL schema changes.Since this is a generated file, ensure that the corresponding changes in the GraphQL schema (
api/gql/log.graphql
) properly define:
- The
Log
type with all fields- The
LogLevel
enum with all values- Any queries or mutations that use these types
✅ Verification successful
GraphQL schema changes are properly defined ✓
The schema in
api/gql/log.graphql
correctly defines all the necessary types and fields that match the generated code, including:
- Complete
LogLevel
enum- Well-structured
Log
type with all required fields- Properly defined
logs
query with required parameters🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check if the GraphQL schema properly defines the Log types cat api/gql/log.graphqlLength of output: 289
api/internal/adapter/gql/generated.go (2)
2399-2418
: Well-structured GraphQL schema for logging system!The Log type and LogLevel enum are well-defined with:
- Comprehensive log levels (ERROR to TRACE)
- Required fields marked as non-null
- Proper use of DateTime scalar for timestamp
- Clear and consistent naming
20467-20509
: Excellent performance optimization using concurrent marshaling!The implementation efficiently handles array marshaling by:
- Using goroutines for parallel processing when array length > 1
- Optimizing single-element arrays by avoiding goroutine overhead
- Proper synchronization using WaitGroup
- Thorough error handling with panic recovery
api/internal/infrastructure/gcs/real_gcs.go (1)
1-10
: LGTM! Well-structured package with appropriate imports.The package structure is clean and imports are properly organized with the required dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
api/internal/usecase/interactor/log_test.go (3)
15-22
: Enhance mock implementation for better testing.Consider improving the mock implementation by:
- Adding method call tracking to verify behavior
- Implementing compile-time interface validation
+var _ usecase.LogGateway = (*mockLogGateway)(nil) // Compile-time interface validation type mockLogGateway struct { logs []*log.Log err error + calls []struct { + since time.Time + until time.Time + workflowID id.WorkflowID + jobID id.JobID + } } func (m *mockLogGateway) GetLogs(ctx context.Context, since time.Time, until time.Time, workflowID id.WorkflowID, jobID id.JobID) ([]*log.Log, error) { + m.calls = append(m.calls, struct { + since time.Time + until time.Time + workflowID id.WorkflowID + jobID id.JobID + }{since, until, workflowID, jobID}) return m.logs, m.err }
24-41
: Add missing test cases for constructor edge cases.Consider adding the following test cases:
- Constructor behavior with nil gateways
- Zero threshold value handling
func TestNewLogInteractor(t *testing.T) { // ... existing tests ... + t.Run("nil gateways should still construct", func(t *testing.T) { + li := NewLogInteractor(nil, nil, 10*time.Minute) + assert.NotNil(t, li) + }) + + t.Run("zero threshold should default to 60 minutes", func(t *testing.T) { + redisMock := &mockLogGateway{} + gcsMock := &mockLogGateway{} + li := NewLogInteractor(redisMock, gcsMock, 0) + logi := li.(*LogInteractor) + assert.Equal(t, 60*time.Minute, logi.recentLogsThreshold) + }) }
43-121
: Add test cases for missing scenarios.Consider adding the following test cases:
- Validation of the 'until' parameter
- Context cancellation handling
- Edge case where 'since' equals the threshold boundary
func TestLogInteractor_GetLogs(t *testing.T) { // ... existing setup ... + t.Run("respect until parameter", func(t *testing.T) { + li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour) + now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + since := now.Add(-30 * time.Minute) + until := now.Add(-15 * time.Minute) + out, err := li.GetLogs(context.Background(), since, until, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) + assert.NoError(t, err) + assert.Equal(t, redisLogs, out) + }) + + t.Run("handle context cancellation", func(t *testing.T) { + li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + out, err := li.GetLogs(ctx, time.Now(), id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) + assert.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled)) + }) + + t.Run("exactly at threshold boundary", func(t *testing.T) { + li := NewLogInteractor(redisMock, gcsMock, 1*time.Hour) + now := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + since := now.Add(-1 * time.Hour) + out, err := li.GetLogs(context.Background(), since, id.NewWorkflowID(), id.NewJobID(), &usecase.Operator{}) + assert.NoError(t, err) + // Verify which gateway was used + assert.Equal(t, redisLogs, out) + }) }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
api/go.mod
(2 hunks)api/internal/app/config/log.go
(1 hunks)api/internal/infrastructure/gcs/log.go
(1 hunks)api/internal/infrastructure/redis/log.go
(1 hunks)api/internal/usecase/interactor/common.go
(2 hunks)api/internal/usecase/interactor/log_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- api/internal/usecase/interactor/common.go
- api/go.mod
- api/internal/app/config/log.go
- api/internal/infrastructure/redis/log.go
- api/internal/infrastructure/gcs/log.go
🔇 Additional comments (1)
api/internal/usecase/interactor/log_test.go (1)
43-54
: Make tests deterministic by using fixed timestamps.The current implementation uses
time.Now()
which can make tests flaky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
api/internal/app/repo.go (1)
130-146
: 🛠️ Refactor suggestionAdd Redis connection health check and cleanup.
The Redis client initialization should include:
- Connection health check using
Ping
- Connection timeout configuration
- Proper cleanup when context is done
🧹 Nitpick comments (2)
api/internal/app/config/log.go (2)
5-9
: Add field documentation and consider password security.The struct fields should be documented to explain their purpose and requirements. Also, consider adding password validation.
type RedisLogConfig struct { + // Addr is the Redis server address in format host:port + // Example: localhost:6379 or redis://localhost:6379 Addr string `pp:",omitempty"` + // Password is the Redis server authentication password (optional) + // For security, ensure this is stored securely and not in plain text Password string `pp:",omitempty"` + // DB is the Redis database number (0-15) DB int `pp:",omitempty"` }
11-22
: Enhance Redis address validation.While the current validation is good, consider adding more robust address validation using regex to ensure proper format.
+import "regexp" + func (r RedisLogConfig) IsConfigured() bool { if r.Addr == "" { return false } if r.DB < 0 || r.DB > 15 { return false } - if !strings.Contains(r.Addr, ":") { + // Validate Redis address format: host:port or redis://host:port + redisAddrPattern := `^(redis://)?([a-zA-Z0-9.-]+):(\d{1,5})$` + if !regexp.MustCompile(redisAddrPattern).MatchString(r.Addr) { return false } return true }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
api/.env.example
(1 hunks)api/internal/app/config/config.go
(1 hunks)api/internal/app/config/log.go
(1 hunks)api/internal/app/repo.go
(3 hunks)api/internal/usecase/gateway/container.go
(1 hunks)api/internal/usecase/interactor/common.go
(1 hunks)api/internal/usecase/interactor/log.go
(1 hunks)api/internal/usecase/interactor/log_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- api/internal/usecase/gateway/container.go
- api/internal/app/config/config.go
- api/.env.example
- api/internal/usecase/interactor/common.go
- api/internal/usecase/interactor/log_test.go
🔇 Additional comments (3)
api/internal/app/config/log.go (1)
1-4
: LGTM!Package name and imports are appropriate for the functionality.
api/internal/usecase/interactor/log.go (1)
16-18
: 🛠️ Refactor suggestionConsider adding GCS gateway field.
The struct only has Redis gateway but based on the PR objectives, GCS integration is also planned. Consider adding the GCS gateway field.
type LogInteractor struct { logsGatewayRedis gateway.Log + logsGatewayGCS gateway.Log }
Likely invalid or redundant comment.
api/internal/app/repo.go (1)
72-73
: Consider initializing GCS log gateway.Based on the PR objectives, GCS integration is planned but not initialized here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
api/internal/adapter/gql/generated.go (1)
148-148
: Consider adding pagination to the logs query.The logs query might return a large number of results, which could impact performance. Consider implementing pagination similar to how it's done for the jobs query.
Apply this diff to add pagination to the schema:
- logs(since: DateTime!, jobId: ID!): [Log] + logs(since: DateTime!, jobId: ID!, pagination: Pagination): LogConnection! +type LogConnection { + edges: [LogEdge!]! + pageInfo: PageInfo! +} +type LogEdge { + node: Log! + cursor: String! +}Also applies to: 171-177, 280-280
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
api/gql/job.graphql
(1 hunks)api/internal/adapter/gql/generated.go
(24 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- api/gql/job.graphql
🔇 Additional comments (4)
api/internal/adapter/gql/generated.go (4)
2404-2423
: LGTM! Well-structured GraphQL schema for logs.The Log type and LogLevel enum are well-designed with appropriate field requirements and standard log levels.
11076-11138
: LGTM! Well-implemented query resolvers with proper error handling.The logs query resolver and field contexts are well-implemented with comprehensive error handling and middleware integration.
17579-17633
: LGTM! Robust Log type implementation with proper null handling.The Log type implementation includes proper null handling for required fields and comprehensive error recovery.
21631-21677
: LGTM! Efficient marshaling implementation with proper concurrency handling.The marshaling implementation efficiently handles arrays with proper concurrency and error recovery.
Overview
This pull request introduces changes to add real-time logging functionality using Redis, along with various enhancements to the GraphQL API to support log retrieval. The most important changes include modifications to environment configuration, updates to GraphQL schemas and resolvers, and the addition of Redis-based log storage and retrieval.
Environment Configuration:
api/.env.example
andapi/internal/app/config/log.go
.GraphQL API Enhancements:
Job
type and added a newLog
type andLogLevel
enum inapi/gql/job.graphql
andapi/gql/log.graphql
.api/internal/adapter/gql/resolver_query.go
.api/internal/adapter/gql/gqlmodel/convert_log.go
and corresponding tests inapi/internal/adapter/gql/gqlmodel/convert_log_test.go
.Redis-based Log Storage:
api/internal/infrastructure/redis/log.go
.api/internal/app/repo.go
.Dependency Updates:
api/go.mod
.What I've done
What I haven't done
How I tested
Screenshot
Which point I want you to review particularly
Memo
Summary by CodeRabbit
Summary by CodeRabbit
Based on the comprehensive summary, here are the updated release notes:
New Features
logs
field added to theJob
type in the GraphQL schema for querying associated logs.Infrastructure
GraphQL
logs
query added to retrieve log entries with filtering capabilities.Performance