mirror of
https://github.com/ANL-CEEESA/UnitCommitment.jl.git
synced 2025-12-06 08:18:51 -06:00
Compare commits
3 Commits
35dd5ab1a9
...
1254780e42
| Author | SHA1 | Date | |
|---|---|---|---|
| 1254780e42 | |||
| ad8ee6fe6b | |||
| e52798da7a |
@@ -2,8 +2,6 @@
|
|||||||
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
|
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
|
||||||
# Released under the modified BSD license. See COPYING.md for more details.
|
# Released under the modified BSD license. See COPYING.md for more details.
|
||||||
|
|
||||||
using UnitCommitment
|
|
||||||
|
|
||||||
import Base: put!
|
import Base: put!
|
||||||
|
|
||||||
Base.@kwdef mutable struct JobProcessor
|
Base.@kwdef mutable struct JobProcessor
|
||||||
@@ -11,7 +9,7 @@ Base.@kwdef mutable struct JobProcessor
|
|||||||
processing::Channel{String} = Channel{String}(Inf)
|
processing::Channel{String} = Channel{String}(Inf)
|
||||||
shutdown::Channel{Bool} = Channel{Bool}(1)
|
shutdown::Channel{Bool} = Channel{Bool}(1)
|
||||||
worker_task::Union{Task,Nothing} = nothing
|
worker_task::Union{Task,Nothing} = nothing
|
||||||
optimizer = nothing
|
work_fn::Function
|
||||||
end
|
end
|
||||||
|
|
||||||
function Base.put!(processor::JobProcessor, job_id::String)
|
function Base.put!(processor::JobProcessor, job_id::String)
|
||||||
@@ -38,41 +36,13 @@ function run!(processor::JobProcessor)
|
|||||||
|
|
||||||
# Move job from pending to processing queue
|
# Move job from pending to processing queue
|
||||||
job_id = take!(processor.pending)
|
job_id = take!(processor.pending)
|
||||||
@info "Processing job: $job_id"
|
|
||||||
job_dir = joinpath(basedir, "jobs", job_id)
|
|
||||||
log_path = joinpath(job_dir, "output.log")
|
|
||||||
put!(processor.processing, job_id)
|
put!(processor.processing, job_id)
|
||||||
|
|
||||||
# Run optimization
|
# Run work function
|
||||||
try
|
processor.work_fn(job_id)
|
||||||
open(log_path, "w") do io
|
|
||||||
redirect_stdout(io) do
|
|
||||||
redirect_stderr(io) do
|
|
||||||
json_path = joinpath(job_dir, "input.json.gz")
|
|
||||||
instance = UnitCommitment.read(json_path)
|
|
||||||
model = UnitCommitment.build_model(;
|
|
||||||
instance,
|
|
||||||
optimizer = processor.optimizer,
|
|
||||||
)
|
|
||||||
UnitCommitment.optimize!(model)
|
|
||||||
solution = UnitCommitment.solution(model)
|
|
||||||
return UnitCommitment.write(
|
|
||||||
joinpath(job_dir, "output.json"),
|
|
||||||
solution,
|
|
||||||
)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Remove job from processing queue
|
# Remove job from processing queue
|
||||||
take!(processor.processing)
|
take!(processor.processing)
|
||||||
catch e
|
|
||||||
open(log_path, "a") do io
|
|
||||||
println(io, "\nError: ", e)
|
|
||||||
println(io, "\nStacktrace:")
|
|
||||||
return Base.show_backtrace(io, catch_backtrace())
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|||||||
@@ -44,14 +44,73 @@ function submit(req, processor::JobProcessor)
|
|||||||
end
|
end
|
||||||
|
|
||||||
function jobs_view(req)
|
function jobs_view(req)
|
||||||
return HTTP.Response(200, "OK")
|
# Extract job_id from URL path /jobs/{job_id}/view
|
||||||
|
path_parts = split(req.target, '/')
|
||||||
|
job_id = path_parts[3] # /jobs/{job_id}/view -> index 3
|
||||||
|
|
||||||
|
# Construct job directory path
|
||||||
|
job_dir = joinpath(basedir, "jobs", job_id)
|
||||||
|
|
||||||
|
# Check if job directory exists
|
||||||
|
if !isdir(job_dir)
|
||||||
|
return HTTP.Response(404, "Job not found")
|
||||||
|
end
|
||||||
|
|
||||||
|
# Read log file if it exists
|
||||||
|
log_path = joinpath(job_dir, "output.log")
|
||||||
|
log_content = isfile(log_path) ? read(log_path, String) : nothing
|
||||||
|
|
||||||
|
# Read output.json if it exists
|
||||||
|
output_path = joinpath(job_dir, "output.json")
|
||||||
|
output_content = isfile(output_path) ? read(output_path, String) : nothing
|
||||||
|
|
||||||
|
# Create response JSON
|
||||||
|
response_data = Dict(
|
||||||
|
"log" => log_content,
|
||||||
|
"solution" => output_content
|
||||||
|
)
|
||||||
|
|
||||||
|
response_body = JSON.json(response_data)
|
||||||
|
return HTTP.Response(200, response_body)
|
||||||
end
|
end
|
||||||
|
|
||||||
function start_server(port::Int = 8080; optimizer)
|
function start_server(port::Int = 8080; optimizer)
|
||||||
Random.seed!()
|
Random.seed!()
|
||||||
|
|
||||||
|
function work_fn(job_id)
|
||||||
|
job_dir = joinpath(basedir, "jobs", job_id)
|
||||||
|
mkpath(job_dir)
|
||||||
|
input_filename = joinpath(job_dir, "input.json.gz")
|
||||||
|
log_filename = joinpath(job_dir, "output.log")
|
||||||
|
solution_filename = joinpath(job_dir, "output.json")
|
||||||
|
try
|
||||||
|
open(log_filename, "w") do io
|
||||||
|
redirect_stdout(io) do
|
||||||
|
redirect_stderr(io) do
|
||||||
|
instance = UnitCommitment.read(input_filename)
|
||||||
|
model = UnitCommitment.build_model(;
|
||||||
|
instance,
|
||||||
|
optimizer = optimizer,
|
||||||
|
)
|
||||||
|
UnitCommitment.optimize!(model)
|
||||||
|
solution = UnitCommitment.solution(model)
|
||||||
|
UnitCommitment.write(solution_filename, solution)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
catch e
|
||||||
|
@error "Failed job: $job_id" e
|
||||||
|
open(log_filename, "a") do io
|
||||||
|
println(io, "\nError: ", e)
|
||||||
|
println(io, "\nStacktrace:")
|
||||||
|
return Base.show_backtrace(io, catch_backtrace())
|
||||||
|
end
|
||||||
|
end
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
# Create and start job processor
|
# Create and start job processor
|
||||||
processor = JobProcessor(optimizer = optimizer)
|
processor = JobProcessor(; work_fn)
|
||||||
start(processor)
|
start(processor)
|
||||||
|
|
||||||
router = HTTP.Router()
|
router = HTTP.Router()
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ end
|
|||||||
function runtests()
|
function runtests()
|
||||||
@testset "UCJL Backend" begin
|
@testset "UCJL Backend" begin
|
||||||
server_test_usage()
|
server_test_usage()
|
||||||
# jobs_test_usage()
|
jobs_test_usage()
|
||||||
end
|
end
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -4,40 +4,29 @@
|
|||||||
|
|
||||||
using Backend
|
using Backend
|
||||||
using Test
|
using Test
|
||||||
using HiGHS
|
|
||||||
|
|
||||||
function jobs_test_usage()
|
function jobs_test_usage()
|
||||||
@testset "JobProcessor" begin
|
@testset "JobProcessor" begin
|
||||||
# Setup job directory
|
# Define dummy work function
|
||||||
job_id = "qwe123"
|
received_job_id = []
|
||||||
job_dir = joinpath(Backend.basedir, "jobs", job_id)
|
function work_fn(job_id)
|
||||||
mkpath(job_dir)
|
push!(received_job_id, job_id)
|
||||||
cp(fixture("case14.json.gz"), joinpath(job_dir, "input.json.gz"))
|
|
||||||
|
|
||||||
try
|
|
||||||
# Create processor with HiGHS optimizer
|
|
||||||
processor = JobProcessor(optimizer = HiGHS.Optimizer)
|
|
||||||
|
|
||||||
# Start the worker
|
|
||||||
start(processor)
|
|
||||||
|
|
||||||
# Push job to queue
|
|
||||||
put!(processor, job_id)
|
|
||||||
|
|
||||||
# Wait until all jobs are processed
|
|
||||||
while isbusy(processor)
|
|
||||||
sleep(0.1)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Check that solution file exists
|
|
||||||
output_path = joinpath(job_dir, "output.json")
|
|
||||||
@test isfile(output_path)
|
|
||||||
|
|
||||||
# Stop the worker
|
|
||||||
stop(processor)
|
|
||||||
finally
|
|
||||||
# Cleanup
|
|
||||||
rm(job_dir, recursive = true, force = true)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Create processor with work function
|
||||||
|
processor = JobProcessor(; work_fn)
|
||||||
|
|
||||||
|
# Start the worker
|
||||||
|
start(processor)
|
||||||
|
|
||||||
|
# Push job to queue
|
||||||
|
put!(processor, "test")
|
||||||
|
|
||||||
|
# Wait for job to complete
|
||||||
|
sleep(0.1)
|
||||||
|
stop(processor)
|
||||||
|
|
||||||
|
# Check that the work function was called with correct job_id
|
||||||
|
@test received_job_id[1] == "test"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -6,39 +6,55 @@ const PORT = 32617
|
|||||||
|
|
||||||
function server_test_usage()
|
function server_test_usage()
|
||||||
server = Backend.start_server(PORT; optimizer = HiGHS.Optimizer)
|
server = Backend.start_server(PORT; optimizer = HiGHS.Optimizer)
|
||||||
|
try
|
||||||
|
# Read the compressed fixture file
|
||||||
|
compressed_data = read(fixture("case14.json.gz"))
|
||||||
|
|
||||||
# Read the compressed fixture file
|
# Submit test case
|
||||||
compressed_data = read(fixture("case14.json.gz"))
|
response = HTTP.post(
|
||||||
|
"http://localhost:$PORT/submit",
|
||||||
|
["Content-Type" => "application/gzip"],
|
||||||
|
compressed_data,
|
||||||
|
)
|
||||||
|
@test response.status == 200
|
||||||
|
|
||||||
# Submit test case
|
# Check response
|
||||||
response = HTTP.post(
|
response_data = JSON.parse(String(response.body))
|
||||||
"http://localhost:$PORT/submit",
|
@test haskey(response_data, "job_id")
|
||||||
["Content-Type" => "application/gzip"],
|
job_id = response_data["job_id"]
|
||||||
compressed_data,
|
@test length(job_id) == 16
|
||||||
)
|
|
||||||
@test response.status == 200
|
|
||||||
|
|
||||||
# Check response
|
# Wait for jobs to finish
|
||||||
response_data = JSON.parse(String(response.body))
|
sleep(0.1)
|
||||||
@test haskey(response_data, "job_id")
|
while isbusy(server.processor)
|
||||||
job_id = response_data["job_id"]
|
sleep(0.1)
|
||||||
@test length(job_id) == 16
|
end
|
||||||
|
|
||||||
# Wait for jobs to finish and stop server
|
# Verify the compressed file was saved correctly
|
||||||
sleep(0.1)
|
job_dir = joinpath(Backend.basedir, "jobs", job_id)
|
||||||
stop(server)
|
saved_input_path = joinpath(job_dir, "input.json.gz")
|
||||||
|
saved_log_path = joinpath(job_dir, "output.log")
|
||||||
|
saved_output_path = joinpath(job_dir, "output.json")
|
||||||
|
@test isfile(saved_input_path)
|
||||||
|
@test isfile(saved_log_path)
|
||||||
|
@test isfile(saved_output_path)
|
||||||
|
saved_data = read(saved_input_path)
|
||||||
|
@test saved_data == compressed_data
|
||||||
|
|
||||||
# Verify the compressed file was saved correctly
|
# Query job information
|
||||||
job_dir = joinpath(Backend.basedir, "jobs", job_id)
|
view_response = HTTP.get("http://localhost:$PORT/jobs/$job_id/view")
|
||||||
saved_input_path = joinpath(job_dir, "input.json.gz")
|
@test view_response.status == 200
|
||||||
saved_log_path = joinpath(job_dir, "output.log")
|
|
||||||
saved_output_path = joinpath(job_dir, "output.json")
|
|
||||||
@test isfile(saved_input_path)
|
|
||||||
@test isfile(saved_log_path)
|
|
||||||
@test isfile(saved_output_path)
|
|
||||||
saved_data = read(saved_input_path)
|
|
||||||
@test saved_data == compressed_data
|
|
||||||
|
|
||||||
# Clean up: remove the job directory
|
# Check response
|
||||||
# rm(job_dir, recursive=true)
|
view_data = JSON.parse(String(view_response.body))
|
||||||
|
@test haskey(view_data, "log")
|
||||||
|
@test haskey(view_data, "solution")
|
||||||
|
@test view_data["log"] !== nothing
|
||||||
|
@test view_data["solution"] !== nothing
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
rm(job_dir, recursive=true)
|
||||||
|
finally
|
||||||
|
stop(server)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user