Skip to content

Commit

Permalink
added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nsimakov committed May 14, 2024
1 parent 9f4cfda commit b2c7140
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

julia_version = "1.10.2"
manifest_format = "2.0"
project_hash = "4a0a359147081e21854cb9fc59dfd4b6ff618869"
project_hash = "5cb2780489849b022bfa5cc0e98f484c9f216bcd"

[[deps.ANSIColoredPrinters]]
git-tree-sha1 = "574baf8110975760d391c710b6341da1afa48d8c"
Expand Down
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
DataFramesMeta = "1313f7d8-7da2-5740-9ea0-a2ca25f37964"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
DocStringExtensions = "ffbed154-4ef7-542d-bbb7-c09d3a79fcae"
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Revise = "295af30f-e4ad-537b-8983-00126c2a3abe"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
Expand Down
4 changes: 2 additions & 2 deletions examples/simple1.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ end

adf, mdf = run!(sim; run_till_no_jobs=true)

println(adf)
println(mdf)
println(adf[1:10,:])
println(mdf[1:10,:])
22 changes: 22 additions & 0 deletions examples/simple_job_trace_replay.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Random
using Agents
using HPCMod
using Printf
using DataFrames
using DataStructures

job_trace = DataFrame([
2 1 2 4;
4 2 3 4;
6 2 2 4;
6 1 2 4;
7 1 1 4;
], [
"submit_time", "user_id", "nodes", "walltime"
])

sim = jobs_replay_on_resource(job_trace; nodes=4, scheduler_backfill=false, workload_done_check_freq=1, rng=Random.Xoshiro(123))
show(stdout, "text/plain", Matrix(sim.resource.stats.node_occupancy_by_job))

sim = jobs_replay_on_resource(job_trace; nodes=4, scheduler_backfill=true, workload_done_check_freq=1, rng=Random.Xoshiro(123))
show(stdout, "text/plain", Matrix(sim.resource.stats.node_occupancy_by_job))
5 changes: 5 additions & 0 deletions src/HPCMod.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ export add_resource!
export model_step!
# export is_workload_done
export run!
export get_nodename


include("hpc_user_model.jl")

export add_users_and_jobs_from_dataframe
export jobs_replay_on_resource
include("utils.jl")
end
115 changes: 95 additions & 20 deletions src/hpc_user_model.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ using DataFrames
import Agents: run!
using Tidier
using DataStructures
using Printf


used_nodes(r::HPCResource) = sum(r.node_used_by_job .!= 0)
used_nodes(m::StandardABM) = used_nodes(m.sim.resource)

get_nodename(node_id) = @sprintf("N%04d", node_id)

"""
if user_id is specified it is up to programmer to add it to user.tasks_to_do (or not)
"""
Expand Down Expand Up @@ -42,15 +45,24 @@ function BatchJob(
nodes::Int64=1,
walltime::Int64=1,
submit_time::Int64=-1,
jobs_list::Union{SortedSet,Nothing}=nothing)::BatchJob
job_id::Int64=-1,
jobs_list::Union{SortedSet,Nothing}=nothing
)::BatchJob

if job_id == -1
sim.last_job_id += 1
else
sim.last_job_id = job_id
sim.last_job_id in keys(sim.jobs_dict) && error("Such job (job_id=$(job_id)) already exists")
end

sim.last_job_id += 1
push!(sim.jobs_list, BatchJob(sim.last_job_id, task, nodes, walltime, submit_time, 0, 0, NotScheduled, []))

sim.jobs_dict[sim.last_job_id] = sim.jobs_list[end]

if jobs_list !== nothing
push!(jobs_list, sim.jobs_list[sim.last_job_id])
push!(jobs_list, sim.jobs_list[end])
end
return sim.jobs_list[sim.last_job_id]
return sim.jobs_list[end]
end

"""
Expand All @@ -68,7 +80,9 @@ function User(
sim.last_user_id += 1
else
sim.last_user_id = user_id
sim.last_user_id in keys(sim.users_dict) && error("Such user (user_id=$(user_id)) already exists")
end

create_time = isnothing(sim.model) ? 0 : abmtime(sim.model)

user = User(
Expand All @@ -84,20 +98,30 @@ function User(
SortedSet{BatchJob}())

push!(sim.users_list, user)
sim.users_dict[sim.last_user_id] = user

# add agent to model
add_agent!(user, sim.model)

return sim.users_list[sim.last_user_id]
return sim.users_list[end]
end

function HPCResourceStats()
HPCResourceStats(1,DataFrame(),DataFrame())
end


function add_resource!(sim::Simulation;
nodes=10,
max_nodes_per_job=4,
max_time_per_job=24 * 3,
scheduler_fifo=true,
scheduler_backfill=true
)::HPCResource
sim.space!==nothing && error("sim.space already initialized")
sim.resource!==nothing && error("sim.resource already initialized")
nodes <=0 && error("nodes should be positive!")

sim.space = GridSpace((nodes,); periodic=false, metric=:manhattan)
sim.resource = HPCResource(
nodes,
Expand All @@ -106,13 +130,26 @@ function add_resource!(sim::Simulation;
fill(Int64(-1), nodes),
[], Dict{Int64,BatchJob}(), [],
max_nodes_per_job, max_time_per_job,
scheduler_fifo, scheduler_backfill
scheduler_fifo, scheduler_backfill,
HPCResourceStats()
)

# init some of sim.resource.stats members
ncol(sim.resource.stats.node_occupancy_by_user)!=0 && error("sim.stats.node_occupancy already initialized")
node_occupancy_by_user = sim.resource.stats.node_occupancy_by_user
node_occupancy_by_user.t = Vector{Int64}()
for node_id in 1:nodes
node_occupancy_by_user[!,get_nodename(node_id)]=Vector{Int64}()
end
node_occupancy_by_job = sim.resource.stats.node_occupancy_by_job
node_occupancy_by_job.t = Vector{Int64}()
for node_id in 1:nodes
node_occupancy_by_job[!,get_nodename(node_id)]=Vector{Int64}()
end

sim.resource
end


"""
Simulation constructor
"""
Expand All @@ -127,16 +164,17 @@ function Simulation(
sim = Simulation(
id,
timeunits_per_day,
0, [],
0, [],
0, [],
0, Vector{CompTask}(),
0, Vector{BatchJob}(),Dict{Int64,BatchJob}(),
0, Vector{User}(), Dict{Int64,User}(),
nothing,
nothing,
nothing,
rng,
nothing,
nothing,
user_extra_step, model_extra_step
user_extra_step, model_extra_step,
1000 # workload_done_check_freq
)

sim.model = StandardABM(
Expand Down Expand Up @@ -215,7 +253,7 @@ function user_step!(sim::Simulation, model::StandardABM, user::User)
push!(job.task.jobs, job.id)
end

# retire compleated active tasks
# retire completed active tasks
i = 1
while i <= length(user.tasks_active)
if user.tasks_active[i].nodetime_left <= 0 && user.tasks_active[i].nodetime_total > 0
Expand Down Expand Up @@ -268,12 +306,15 @@ function place_job!(model::StandardABM, resource::HPCResource, job_position_at_q
run_till = abmtime(model) + job.walltime
job.start_time = abmtime(model)
node_count = 0
for i in 1:resource.nodes
if resource.node_used_by_job[i] == 0
resource.node_used_by_job[i] = job.id
resource.node_released_at[i] = run_till

for node_id in 1:resource.nodes
if resource.node_used_by_job[node_id] == 0
resource.node_used_by_job[node_id] = job.id
resource.node_released_at[node_id] = run_till
node_count += 1

push!(job.nodes_list, node_id)

if node_count == job.nodes
break
end
Expand All @@ -300,6 +341,7 @@ end
function run_scheduler_backfill!(sim::Simulation, model::StandardABM, resource::HPCResource)
while length(resource.queue) > 0
nodes_free = resource.nodes - used_nodes(resource)
#println("Time: $(abmtime(model)) free nodes: $(nodes_free) queue $(length(resource.queue))")

# any jobs with fit by node count
job_fit = findfirst(
Expand Down Expand Up @@ -338,6 +380,12 @@ function run_scheduler!(sim::Simulation, model::StandardABM, resource::HPCResour
end
end


"""
Check for finished jobs
the convention is that job run all the way till current time,
excluding current time.
"""
function check_finished_job!(sim::Simulation, model::StandardABM, resource::HPCResource)
cur_time = abmtime(model)
for i in 1:resource.nodes
Expand All @@ -351,7 +399,6 @@ function check_finished_job!(sim::Simulation, model::StandardABM, resource::HPCR
if resource.node_used_by_job[i2] == job.id
resource.node_used_by_job[i2] = 0
resource.node_released_at[i2] = -1
push!(job.nodes_list, i2)
end
end
push!(resource.history, job)
Expand All @@ -362,10 +409,35 @@ function check_finished_job!(sim::Simulation, model::StandardABM, resource::HPCR
return
end

function model_step_stats!(sim::Simulation)
abmtime(sim.model)%sim.resource.stats.calc_freq!=0 && return

ncol(sim.resource.stats.node_occupancy_by_user)==0 && error("sim.resource.stats.node_occupancy_by_user was not initialized!")
by_user = zeros(Int64,ncol(sim.resource.stats.node_occupancy_by_user))
by_job = zeros(Int64,ncol(sim.resource.stats.node_occupancy_by_user))
by_user[1] = abmtime(sim.model)
by_job[1] = abmtime(sim.model)
for (job_id,job) in sim.resource.executing
for node_id in job.nodes_list
col_id = node_id + 1
by_user[col_id] != 0 && error("node can be occupied only by one job, but it is not!")
by_user[col_id] = job.task.user_id
by_job[col_id] = job_id
end
end
push!(sim.resource.stats.node_occupancy_by_user, by_user)
push!(sim.resource.stats.node_occupancy_by_job, by_job)

end

function model_step!(model::StandardABM)
sim::Simulation = model.sim
# it is right before abmtime(model) time
# check finished job
check_finished_job!(sim, model, model.sim.resource)


# it is abmtime(model) time
# schedule
run_scheduler!(sim, model, model.sim.resource)

Expand All @@ -381,14 +453,16 @@ function model_step!(model::StandardABM)
# schedule
run_scheduler!(sim, model, model.sim.resource)

# more stats
model_step_stats!(sim)

# model extra step
isnothing(sim.model_extra_step) == false && sim.model_extra_step(sim, model)
end

function is_workload_done(model, s)
s % 1000 != 0 && return false

sim = getproperty(model, :sim)
s % sim.workload_done_check_freq != 0 && return false

length(sim.resource.queue) > 0 && return false
length(sim.resource.executing) > 0 && return false
Expand Down Expand Up @@ -434,5 +508,6 @@ function run!(sim::Simulation; nsteps::Int64=-1, run_till_no_jobs::Bool=false)

rename!(sim.adf, [[:time]; [v[1] for v in adata0]])
rename!(sim.mdf, [[:time]; [v[1] for v in mdata0]])

sim.adf, sim.mdf
end
14 changes: 14 additions & 0 deletions src/hpc_user_model_types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ $(TYPEDFIELDS)
inividual_jobs::SortedSet{BatchJob}
end

"""
structure to store various resource usage stats
"""
mutable struct HPCResourceStats
calc_freq::Int64
node_occupancy_by_user::DataFrame
node_occupancy_by_job::DataFrame
end

"""
HPCResource - HPC Resource
struct to track resource occupiency and jobs
Expand All @@ -107,17 +116,21 @@ mutable struct HPCResource
max_time_per_job::Int64
scheduler_fifo::Bool
scheduler_backfill::Bool
stats::HPCResourceStats
end


mutable struct Simulation
id::Int64
timeunits_per_day::Int64
last_task_id::Int64
task_list::Vector{CompTask}
last_job_id::Int64
jobs_list::Vector{BatchJob}
jobs_dict::Dict{Int64,BatchJob}
last_user_id::Int64
users_list::Vector{User}
users_dict::Dict{Int64,User}
resource::Union{HPCResource,Nothing}
space::Union{GridSpace,Nothing}
model::Union{StandardABM,Nothing}
Expand All @@ -134,4 +147,5 @@ mutable struct Simulation
executed at the end of model_step!
"""
model_extra_step::Union{Function,Nothing}
workload_done_check_freq::Int64
end
Loading

0 comments on commit b2c7140

Please sign in to comment.