From f167c4b8b7fa2fd1f63bc5eb57941e3c1a55f72b Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Mon, 9 Sep 2024 10:15:32 -0700 Subject: [PATCH 1/4] stdlib: Move file filters into runners --- share/wake/lib/system/io.wake | 26 +++++++-- share/wake/lib/system/job.wake | 14 ++--- share/wake/lib/system/job_cache_runner.wake | 2 +- .../wake/lib/system/remote_cache_runner.wake | 12 ++-- share/wake/lib/system/runner.wake | 56 ++++++++++++++++--- 5 files changed, 81 insertions(+), 29 deletions(-) diff --git a/share/wake/lib/system/io.wake b/share/wake/lib/system/io.wake index 73bd439a2..4aa7f7dcb 100644 --- a/share/wake/lib/system/io.wake +++ b/share/wake/lib/system/io.wake @@ -138,7 +138,7 @@ def writeRunner (content: String) = def primWrite (mode: Integer) (path: String) (content: String): Result String String = (\_ \_ \_ prim "write") mode path content - def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = # Command must be ("", "-m", "{string mode}", "{string path}", Nil) require "", "-m", smode, path, Nil = cmd else panic "writeImp violated command-line contract" @@ -161,7 +161,16 @@ def writeRunner (content: String) = match writeTask Fail f -> failWithError f Pass path -> - RunnerOutput (vis | map getPathName) (path,) reality + def fileInputs = + vis + | map getPathName + | fnInputs + + def fileOutputs = + (path, Nil) + | fnOutputs + + RunnerOutput fileInputs fileOutputs reality | Pass makeRunner "write" run @@ -277,7 +286,7 @@ def mkdirRunner: Runner = def primMkdir (mode: Integer) (path: String): Result String String = (\_ \_ prim "mkdir") mode path - def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = # Command must be ("", "-m", "{string mode}", "{string path}", Nil) require "", "-m", smode, path, Nil = cmd else panic "mkdirImp violated command-line contract" @@ -300,7 +309,16 @@ def mkdirRunner: Runner = match mkdirTask Fail f -> failWithError f Pass path -> - RunnerOutput (vis | map getPathName) (path,) reality + def fileInputs = + vis + | map getPathName + | fnInputs + + def fileOutputs = + (path, Nil) + | fnOutputs + + RunnerOutput fileInputs fileOutputs reality | Pass makeRunner "mkdir" run diff --git a/share/wake/lib/system/job.wake b/share/wake/lib/system/job.wake index 263141c1c..ff42e2058 100644 --- a/share/wake/lib/system/job.wake +++ b/share/wake/lib/system/job.wake @@ -150,22 +150,18 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st | getOrElse uusage def output = - run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty)) + run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty finputs foutputs)) def final _ = match output Fail e -> primJobFailLaunch job e Pass (RunnerOutput inputs outputs reality) -> - def input = - finputs inputs - | map simplify - | implode - - def output = - foutputs outputs + def hashedOutputs = + outputs | computeHashes prefix | implode - primJobFinish job input output (implode outputs) reality + # TODO: Determine the effect of all_outputs being filtered + primJobFinish job inputs.implode hashedOutputs hashedOutputs reality # Make sure we don't hash files before the job has stopped running def _ = waitJobMerged final job diff --git a/share/wake/lib/system/job_cache_runner.wake b/share/wake/lib/system/job_cache_runner.wake index 8ed0c1d36..b96eb904c 100644 --- a/share/wake/lib/system/job_cache_runner.wake +++ b/share/wake/lib/system/job_cache_runner.wake @@ -60,7 +60,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero def job_cache_add str = prim "job_cache_add" def run (job: Job) (input: RunnerInput): Result RunnerOutput Error = - def (RunnerInput label cmd vis env dir stdin _ prefix _ _) = input + def (RunnerInput label cmd vis env dir stdin _ prefix _ _ _ _) = input def mkVisJson (Path path hash) = JObject ( diff --git a/share/wake/lib/system/remote_cache_runner.wake b/share/wake/lib/system/remote_cache_runner.wake index b75d159ec..7c90677bc 100644 --- a/share/wake/lib/system/remote_cache_runner.wake +++ b/share/wake/lib/system/remote_cache_runner.wake @@ -237,7 +237,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => require True = shouldDebugRemoteCache Unit def _ = - writeTempFile "remote.cache.lookup.fail" "label: {input.getRunnerInputLabel}\nerror: {err | format}" + writeTempFile "remote.cache.lookup.fail" "label: {label}\nerror: {err | format}" True @@ -252,9 +252,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => require True = shouldDebugRemoteCache Unit def _ = breadcrumb "{label}: Did not find a match" - - def _ = - writeTempFile "remote.cache.lookup.miss" "label: {input.getRunnerInputLabel}" + def _ = writeTempFile "remote.cache.lookup.miss" "label: {label}" True @@ -277,11 +275,11 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => ## --- Helper functions --- # Creates a CacheSearchRequest from the various inputs to a runner -def mkSearchRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _usage isAtty): RunnerInput) (hidden: String) = +def mkSearchRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _usage isAtty _ _): RunnerInput) (hidden: String) = CacheSearchRequest label cmd dir env hidden isAtty stdin vis # Creates a CachePostJobRequest from the various inputs and outputs of a runner -def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty): RunnerInput) (output: RunnerOutput) (hidden: String) (stdoutBlobId: String) (stderrBlobId: String) (files: List CachePostRequestOutputFile) (directories: List CachePostRequestOutputDirectory) (symlinks: List CachePostRequestOutputSymlink) = +def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty _ _): RunnerInput) (output: RunnerOutput) (hidden: String) (stdoutBlobId: String) (stderrBlobId: String) (files: List CachePostRequestOutputFile) (directories: List CachePostRequestOutputDirectory) (symlinks: List CachePostRequestOutputSymlink) = def Usage status runtime cputime mem ibytes obytes = output.getRunnerOutputUsage CachePostRequest @@ -306,7 +304,7 @@ def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ is obytes # Creates a CacheAllowedRequest from the various inputs and outputs of a runner -def mkCacheAllowedRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty): RunnerInput) (output: RunnerOutput) (hidden: String) = +def mkCacheAllowedRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty _ _): RunnerInput) (output: RunnerOutput) (hidden: String) = def Usage status runtime cputime mem _ibytes obytes = output.getRunnerOutputUsage CacheAllowedRequest label cmd dir env hidden isAtty stdin vis status runtime cputime mem obytes diff --git a/share/wake/lib/system/runner.wake b/share/wake/lib/system/runner.wake index 2caf4b23a..0f0378085 100644 --- a/share/wake/lib/system/runner.wake +++ b/share/wake/lib/system/runner.wake @@ -47,6 +47,12 @@ export tuple RunnerInput = export Record: Usage # Determines if job should run in psuedoterminal export IsAtty: Boolean + # Modify the Runner's reported inputs (files read). + # May only be called once for a given job. Wrapping runners are required to maintain this invariant + export FnInputs: (List String => List String) + # Modify the Runner's reported outputs (files created). + # May only be called once for a given job. Wrapping runners are required to maintain this invariant + export FnOutputs: (List String => List String) export tuple RunnerOutput = export Inputs: List String @@ -84,12 +90,20 @@ export def makeRunner (name: String) (run: Job => RunnerInput => Result RunnerOu # You must use Fn{Inputs,Outputs} to fill in this information for wake to maintain safety and reusability # Advanced usage only, proceed with caution export def localRunner: Runner = - def run (job: Job) ((RunnerInput _ cmd vis env dir stdin _ _ predict isatty): RunnerInput): Result RunnerOutput Error = - def jobKey = JobKey dir stdin env.implode cmd.implode 0 "" isatty.booleanToInteger + def run (job: Job) ((RunnerInput _ cmd vis env dir stdin _ _ predict isAtty fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = + def jobKey = JobKey dir stdin env.implode cmd.implode 0 "" isAtty.booleanToInteger def _ = primJobLaunch job jobKey predict + def fileInputs = + vis + | map getPathName + | fnInputs + + # Caller need to fill this in from nothing + def fileOutputs = fnOutputs Nil + job.getJobReality - |< RunnerOutput (vis | map getPathName) Nil + |< RunnerOutput fileInputs fileOutputs makeRunner "local" run @@ -97,11 +111,19 @@ export def localRunner: Runner = # # This runner is useful for tracking a unit of work that is job like but not launched as a process export def virtualRunner: Runner = - def run (job: Job) ((RunnerInput _ _ vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput _ _ vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = def _ = primJobVirtual job "" "" predict + def fileInputs = + vis + | map getPathName + | fnInputs + + # Caller need to fill this in from nothing + def fileOutputs = fnOutputs Nil + job.getJobReality - |< RunnerOutput (vis | map getPathName) Nil + |< RunnerOutput fileInputs fileOutputs makeRunner "virtual" run @@ -172,7 +194,7 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate def script = which (simplify rawScript) def executeOk = access script xOK - def run (job: Job) ((RunnerInput label command visible environment directory stdin res prefix record isatty): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput label command visible environment directory stdin res prefix record isatty fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = require True = executeOk else failWithError "Runner {script} is not executable" @@ -219,8 +241,21 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate def cmd = script, "-I", "-p", specFile, "-o", resultFile, extraArgs # Rewrite input so that the local runner can run the job with a configured sandbox + # The identity function is passsed to the file input/output filters so no info is lost def localInput = - RunnerInput label cmd Nil (extraEnv ++ environment) "." "" Nil prefix (estimate record) isatty + RunnerInput + label + cmd + Nil + (extraEnv ++ environment) + "." + "" + Nil + prefix + (estimate record) + isatty + identity + identity # Dispatch to the local runner via composition and get the outputs def (Runner _ localRun) = localRunner @@ -258,8 +293,13 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate | getOrElse Nil | mapPartial getJString + def jsonInputs = getK `inputs` + def jsonOutputs = getK `outputs` + def filteredInputs = fnInputs jsonInputs + def filteredOutputs = fnOutputs jsonOutputs + match usageResult Fail f -> Fail (makeError f) - Pass usage -> Pass (RunnerOutput (getK `inputs`) (getK `outputs`) usage) + Pass usage -> Pass (RunnerOutput filteredInputs filteredOutputs usage) makeRunner "json-{script}" run From dfcb48318d28fd0c12bcae36034bbe9057f4159e Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Mon, 9 Sep 2024 14:20:07 -0700 Subject: [PATCH 2/4] address comments --- share/wake/lib/system/runner.wake | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/share/wake/lib/system/runner.wake b/share/wake/lib/system/runner.wake index 0f0378085..2201ff912 100644 --- a/share/wake/lib/system/runner.wake +++ b/share/wake/lib/system/runner.wake @@ -48,10 +48,32 @@ export tuple RunnerInput = # Determines if job should run in psuedoterminal export IsAtty: Boolean # Modify the Runner's reported inputs (files read). - # May only be called once for a given job. Wrapping runners are required to maintain this invariant + # Must be called exactly once for a given job. + # + # FnInputs may only be called once as the user provided function may not be idempotent. + # Ex: FnInputs = tail being called multiple times would drop an extra file each time its called + # while the user only intentded for the very first file to be dropped + # + # Note for wrapping runners: + # It is expected that a wrapping runner maintain this invariant. What that looks like will + # depend heavily on the wrapper and requires due diligence. A cache wrapper for example + # should pass it along unmodified since it doesn't care about the actual inputs while + # a smart wrapper tracking file reads must pass a no-op function to the inner runner + # so that it can apply filtering after collecting the files read. export FnInputs: (List String => List String) # Modify the Runner's reported outputs (files created). - # May only be called once for a given job. Wrapping runners are required to maintain this invariant + # Must be called exactly once for a given job. + # + # FnOutputs may only be called once as the user provided function may not be idempotent. + # Ex: FnOutputs = tail being called multiple times would drop an extra file each time its called + # while the user only intentded for the very first file to be dropped + # + # Note for wrapping runners: + # It is expected that a wrapping runner maintain this invariant. What that looks like will + # depend heavily on the wrapper and requires due diligence. A cache wrapper for example + # should pass it along unmodified since it doesn't care about the actual outputs while + # a smart wrapper tracking file writes must pass a no-op function to the inner runner + # so that it can apply filtering after collecting the files written. export FnOutputs: (List String => List String) export tuple RunnerOutput = From 5a4ec72cc55f8ead8c63417a3bb9241935fab8c7 Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Wed, 11 Sep 2024 17:03:05 -0700 Subject: [PATCH 3/4] Push cleanable files into RunnerOutput --- share/wake/lib/system/io.wake | 14 +++++------ share/wake/lib/system/job.wake | 5 ++-- share/wake/lib/system/job_cache_runner.wake | 6 ++--- .../wake/lib/system/remote_cache_runner.wake | 2 +- share/wake/lib/system/runner.wake | 24 +++++++++++++------ 5 files changed, 29 insertions(+), 22 deletions(-) diff --git a/share/wake/lib/system/io.wake b/share/wake/lib/system/io.wake index 4aa7f7dcb..7c0972e89 100644 --- a/share/wake/lib/system/io.wake +++ b/share/wake/lib/system/io.wake @@ -166,11 +166,10 @@ def writeRunner (content: String) = | map getPathName | fnInputs - def fileOutputs = - (path, Nil) - | fnOutputs + def cleanable = (path, Nil) + def fileOutputs = fnOutputs cleanable - RunnerOutput fileInputs fileOutputs reality + RunnerOutput fileInputs fileOutputs cleanable reality | Pass makeRunner "write" run @@ -314,11 +313,10 @@ def mkdirRunner: Runner = | map getPathName | fnInputs - def fileOutputs = - (path, Nil) - | fnOutputs + def cleanable = (path, Nil) + def fileOutputs = fnOutputs cleanable - RunnerOutput fileInputs fileOutputs reality + RunnerOutput fileInputs fileOutputs cleanable reality | Pass makeRunner "mkdir" run diff --git a/share/wake/lib/system/job.wake b/share/wake/lib/system/job.wake index ff42e2058..0729e2bc4 100644 --- a/share/wake/lib/system/job.wake +++ b/share/wake/lib/system/job.wake @@ -154,14 +154,13 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st def final _ = match output Fail e -> primJobFailLaunch job e - Pass (RunnerOutput inputs outputs reality) -> + Pass (RunnerOutput inputs outputs cleanable reality) -> def hashedOutputs = outputs | computeHashes prefix | implode - # TODO: Determine the effect of all_outputs being filtered - primJobFinish job inputs.implode hashedOutputs hashedOutputs reality + primJobFinish job inputs.implode hashedOutputs cleanable.implode reality # Make sure we don't hash files before the job has stopped running def _ = waitJobMerged final job diff --git a/share/wake/lib/system/job_cache_runner.wake b/share/wake/lib/system/job_cache_runner.wake index b96eb904c..111c22697 100644 --- a/share/wake/lib/system/job_cache_runner.wake +++ b/share/wake/lib/system/job_cache_runner.wake @@ -166,7 +166,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero def predict = Usage status runtime cputime mem ibytes obytes def _ = primJobVirtual job stdout stderr predict - Pass (RunnerOutput inputs outputs predict) + Pass (RunnerOutput inputs outputs outputs predict) def _ = require True = isDebugOn @@ -176,7 +176,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero True # Now we need to run the job - require Pass (RunnerOutput inputs outputs usage) = baseDoIt job (Pass input) + require Pass (RunnerOutput inputs outputs cleanable usage) = baseDoIt job (Pass input) def Usage status runtime cputime mem ibytes obytes = usage def inputsTree = listToTree scmpCanonical inputs @@ -243,6 +243,6 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero job_cache_add jobCacheAddJson - Pass (RunnerOutput (map getPathName vis) outputs usage) + Pass (RunnerOutput (map getPathName vis) outputs cleanable usage) makeRunner "job-cache: {name}" run diff --git a/share/wake/lib/system/remote_cache_runner.wake b/share/wake/lib/system/remote_cache_runner.wake index 7c90677bc..6e1c96533 100644 --- a/share/wake/lib/system/remote_cache_runner.wake +++ b/share/wake/lib/system/remote_cache_runner.wake @@ -216,7 +216,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => def _ = primJobVirtual job stdout stderr predict - Pass (RunnerOutput inputs outputs predict) + Pass (RunnerOutput inputs outputs outputs predict) def run (job: Job) (input: RunnerInput): Result RunnerOutput Error = def label = input.getRunnerInputLabel diff --git a/share/wake/lib/system/runner.wake b/share/wake/lib/system/runner.wake index 2201ff912..6bd5da0da 100644 --- a/share/wake/lib/system/runner.wake +++ b/share/wake/lib/system/runner.wake @@ -76,9 +76,17 @@ export tuple RunnerInput = # so that it can apply filtering after collecting the files written. export FnOutputs: (List String => List String) +# The result returned by a Runner after running a job export tuple RunnerOutput = + # The filtered list of files actually read by the job. It can be used for more precise reruns export Inputs: List String + # The filtered list of files output by the job. These files are hashed and elevated to Path export Outputs: List String + # The list of files output by the job that should be deleted by wake --clean. May include files + # listed in Outputs as they will be removed later. The files in the list that are not in + # Outputs will not be hashed, published, or elevated to Path but will be deleted by wake --clean + export CleanableOutputs: List String + # The actual resource usage of the job that will be written to the database export Usage: Usage # A Runner describes a way to invoke a Plan to get a Job @@ -121,11 +129,12 @@ export def localRunner: Runner = | map getPathName | fnInputs - # Caller need to fill this in from nothing - def fileOutputs = fnOutputs Nil + # Caller needs to fill this in from nothing + def cleanable = Nil + def fileOutputs = fnOutputs cleanable job.getJobReality - |< RunnerOutput fileInputs fileOutputs + |< RunnerOutput fileInputs fileOutputs cleanable makeRunner "local" run @@ -141,11 +150,12 @@ export def virtualRunner: Runner = | map getPathName | fnInputs - # Caller need to fill this in from nothing - def fileOutputs = fnOutputs Nil + # Caller needs to fill this in from nothing + def cleanable = Nil + def fileOutputs = fnOutputs cleanable job.getJobReality - |< RunnerOutput fileInputs fileOutputs + |< RunnerOutput fileInputs fileOutputs cleanable makeRunner "virtual" run @@ -322,6 +332,6 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate match usageResult Fail f -> Fail (makeError f) - Pass usage -> Pass (RunnerOutput filteredInputs filteredOutputs usage) + Pass usage -> Pass (RunnerOutput filteredInputs filteredOutputs jsonOutputs usage) makeRunner "json-{script}" run From 05d35baefada9d0a35abbc7448243d2fb2b1a58f Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Wed, 11 Sep 2024 17:09:20 -0700 Subject: [PATCH 4/4] cleanup --- share/wake/lib/system/job_cache_runner.wake | 2 +- share/wake/lib/system/remote_cache_runner.wake | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/share/wake/lib/system/job_cache_runner.wake b/share/wake/lib/system/job_cache_runner.wake index 111c22697..fb3a51da5 100644 --- a/share/wake/lib/system/job_cache_runner.wake +++ b/share/wake/lib/system/job_cache_runner.wake @@ -166,7 +166,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero def predict = Usage status runtime cputime mem ibytes obytes def _ = primJobVirtual job stdout stderr predict - Pass (RunnerOutput inputs outputs outputs predict) + Pass (RunnerOutput inputs outputs Nil predict) def _ = require True = isDebugOn diff --git a/share/wake/lib/system/remote_cache_runner.wake b/share/wake/lib/system/remote_cache_runner.wake index 6e1c96533..04936d635 100644 --- a/share/wake/lib/system/remote_cache_runner.wake +++ b/share/wake/lib/system/remote_cache_runner.wake @@ -216,7 +216,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => def _ = primJobVirtual job stdout stderr predict - Pass (RunnerOutput inputs outputs outputs predict) + Pass (RunnerOutput inputs outputs Nil predict) def run (job: Job) (input: RunnerInput): Result RunnerOutput Error = def label = input.getRunnerInputLabel