From 35dd5ab1a9c96e7e7e4b7d0b2bdc957b7ece1920 Mon Sep 17 00:00:00 2001 From: "Alinson S. Xavier" Date: Thu, 6 Nov 2025 15:22:19 -0600 Subject: [PATCH] web: backend: Implement job queue --- web/backend/src/Backend.jl | 59 +---------------- web/backend/src/jobs.jl | 99 +++++++++++++++++++++++++++++ web/backend/src/server.jl | 73 +++++++++++++++++++++ web/backend/test/src/BackendT.jl | 65 +++---------------- web/backend/test/src/jobs_test.jl | 43 +++++++++++++ web/backend/test/src/server_test.jl | 44 +++++++++++++ 6 files changed, 270 insertions(+), 113 deletions(-) create mode 100644 web/backend/src/jobs.jl create mode 100644 web/backend/src/server.jl create mode 100644 web/backend/test/src/jobs_test.jl create mode 100644 web/backend/test/src/server_test.jl diff --git a/web/backend/src/Backend.jl b/web/backend/src/Backend.jl index 548a911..438d1c7 100644 --- a/web/backend/src/Backend.jl +++ b/web/backend/src/Backend.jl @@ -4,64 +4,9 @@ module Backend -using HTTP -using Random -using JSON -using CodecZlib -using UnitCommitment - basedir = joinpath(dirname(@__FILE__), "..") -function submit(req; optimizer) - # Check if request body is empty - compressed_body = HTTP.payload(req) - if isempty(compressed_body) - return HTTP.Response(400, "Error: No file provided") - end - - # Validate compressed JSON by decompressing and parsing - try - decompressed_data = transcode(GzipDecompressor, compressed_body) - JSON.parse(String(decompressed_data)) - catch e - return HTTP.Response(400, "Error: Invalid compressed JSON") - end - - # Generate random job ID (lowercase letters and numbers) - job_id = randstring(['a':'z'; '0':'9'], 16) - - # Create job directory - job_dir = joinpath(basedir, "jobs", job_id) - mkpath(job_dir) - - # Save input file - json_path = joinpath(job_dir, "input.json.gz") - write(json_path, compressed_body) - - # Optimize file - instance = UnitCommitment.read(json_path) - model = UnitCommitment.build_model(; instance, optimizer) - UnitCommitment.optimize!(model) - solution = UnitCommitment.solution(model) - UnitCommitment.write("$job_dir/output.json", solution) - - # Return job ID as JSON - response_body = JSON.json(Dict("job_id" => job_id)) - return HTTP.Response(200, response_body) -end - -function jobs_view(req) - return HTTP.Response(200, "OK") -end - - -function start_server(port::Int = 8080; optimizer) - Random.seed!() - router = HTTP.Router() - HTTP.register!(router, "POST", "/submit", req -> submit(req; optimizer)) - HTTP.register!(router, "GET", "/jobs/*/view", jobs_view) - server = HTTP.serve!(router, port; verbose = false) - return server -end +include("jobs.jl") +include("server.jl") end diff --git a/web/backend/src/jobs.jl b/web/backend/src/jobs.jl new file mode 100644 index 0000000..0149534 --- /dev/null +++ b/web/backend/src/jobs.jl @@ -0,0 +1,99 @@ +# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment +# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved. +# Released under the modified BSD license. See COPYING.md for more details. + +using UnitCommitment + +import Base: put! + +Base.@kwdef mutable struct JobProcessor + pending::Channel{String} = Channel{String}(Inf) + processing::Channel{String} = Channel{String}(Inf) + shutdown::Channel{Bool} = Channel{Bool}(1) + worker_task::Union{Task,Nothing} = nothing + optimizer = nothing +end + +function Base.put!(processor::JobProcessor, job_id::String) + @info "New job received: $job_id" + return put!(processor.pending, job_id) +end + +function isbusy(processor::JobProcessor) + return isready(processor.pending) || isready(processor.processing) +end + +function run!(processor::JobProcessor) + while true + # Check for shutdown signal + if isready(processor.shutdown) + break + end + + # Wait for a job with timeout + if !isready(processor.pending) + sleep(0.1) + continue + end + + # Move job from pending to processing queue + job_id = take!(processor.pending) + @info "Processing job: $job_id" + job_dir = joinpath(basedir, "jobs", job_id) + log_path = joinpath(job_dir, "output.log") + put!(processor.processing, job_id) + + # Run optimization + try + open(log_path, "w") do io + redirect_stdout(io) do + redirect_stderr(io) do + json_path = joinpath(job_dir, "input.json.gz") + instance = UnitCommitment.read(json_path) + model = UnitCommitment.build_model(; + instance, + optimizer = processor.optimizer, + ) + UnitCommitment.optimize!(model) + solution = UnitCommitment.solution(model) + return UnitCommitment.write( + joinpath(job_dir, "output.json"), + solution, + ) + end + end + end + + # Remove job from processing queue + take!(processor.processing) + catch e + open(log_path, "a") do io + println(io, "\nError: ", e) + println(io, "\nStacktrace:") + return Base.show_backtrace(io, catch_backtrace()) + end + end + end +end + +function start(processor::JobProcessor) + processor.worker_task = @async run!(processor) + return +end + +function stop(processor::JobProcessor) + # Signal worker to stop + put!(processor.shutdown, true) + + # Wait for worker to finish + if processor.worker_task !== nothing + try + wait(processor.worker_task) + catch + # Worker may have already exited + end + end + return +end + +export JobProcessor, start, stop, put!, isbusy diff --git a/web/backend/src/server.jl b/web/backend/src/server.jl new file mode 100644 index 0000000..2424c0e --- /dev/null +++ b/web/backend/src/server.jl @@ -0,0 +1,73 @@ +using HTTP +using Random +using JSON +using CodecZlib +using UnitCommitment + +struct ServerHandle + server::HTTP.Server + processor::JobProcessor +end + +function submit(req, processor::JobProcessor) + # Check if request body is empty + compressed_body = HTTP.payload(req) + if isempty(compressed_body) + return HTTP.Response(400, "Error: No file provided") + end + + # Validate compressed JSON by decompressing and parsing + try + decompressed_data = transcode(GzipDecompressor, compressed_body) + JSON.parse(String(decompressed_data)) + catch e + return HTTP.Response(400, "Error: Invalid compressed JSON") + end + + # Generate random job ID (lowercase letters and numbers) + job_id = randstring(['a':'z'; '0':'9'], 16) + + # Create job directory + job_dir = joinpath(basedir, "jobs", job_id) + mkpath(job_dir) + + # Save input file + json_path = joinpath(job_dir, "input.json.gz") + write(json_path, compressed_body) + + # Add job to queue + put!(processor, job_id) + + # Return job ID as JSON + response_body = JSON.json(Dict("job_id" => job_id)) + return HTTP.Response(200, response_body) +end + +function jobs_view(req) + return HTTP.Response(200, "OK") +end + +function start_server(port::Int = 8080; optimizer) + Random.seed!() + + # Create and start job processor + processor = JobProcessor(optimizer = optimizer) + start(processor) + + router = HTTP.Router() + + # Register /submit endpoint + HTTP.register!(router, "POST", "/submit", req -> submit(req, processor)) + + # Register job/*/view endpoint + HTTP.register!(router, "GET", "/jobs/*/view", jobs_view) + + server = HTTP.serve!(router, port; verbose = false) + return ServerHandle(server, processor) +end + +function stop(handle::ServerHandle) + stop(handle.processor) + close(handle.server) + return nothing +end diff --git a/web/backend/test/src/BackendT.jl b/web/backend/test/src/BackendT.jl index 3f6e3bb..1359fef 100644 --- a/web/backend/test/src/BackendT.jl +++ b/web/backend/test/src/BackendT.jl @@ -12,73 +12,26 @@ import Backend import JuliaFormatter using HiGHS -basedir = dirname(@__FILE__) -port = 32617 +BASEDIR = dirname(@__FILE__) + +include("jobs_test.jl") +include("server_test.jl") function fixture(path::String)::String - return "$basedir/../fixtures/$path" -end - -function with_server(f) - logger = Test.TestLogger() - # server = Base.CoreLogging.with_logger(logger) do - # Backend.start_server(port) - # end - server = Backend.start_server(port; optimizer=HiGHS.Optimizer) - try - f() - finally - close(server) - end - return filter!(x -> x.group == :access, logger.logs) -end - -function test_usage() - with_server() do - # Read the compressed fixture file - compressed_data = read(fixture("case14.json.gz")) - - # Submit test case - response = HTTP.post( - "http://localhost:$port/submit", - ["Content-Type" => "application/gzip"], - compressed_data - ) - @test response.status == 200 - - # Check response - response_data = JSON.parse(String(response.body)) - @test haskey(response_data, "job_id") - job_id = response_data["job_id"] - @test length(job_id) == 16 - @test all(c -> c in ['a':'z'; '0':'9'], collect(job_id)) - - # Verify the compressed file was saved correctly - job_dir = joinpath(Backend.basedir, "jobs", job_id) - saved_path = joinpath(job_dir, "input.json.gz") - @test isfile(saved_path) - saved_data = read(saved_path) - @test saved_data == compressed_data - - response = HTTP.get("http://localhost:$port/jobs/123/view") - @test response.status == 200 - @test String(response.body) == "OK" - - # Clean up: remove the job directory - # rm(job_dir, recursive=true) - end + return "$BASEDIR/../fixtures/$path" end function runtests() @testset "UCJL Backend" begin - test_usage() + server_test_usage() + # jobs_test_usage() end return end function format() - JuliaFormatter.format(basedir, verbose = true) - JuliaFormatter.format("$basedir/../../src", verbose = true) + JuliaFormatter.format(BASEDIR, verbose = true) + JuliaFormatter.format("$BASEDIR/../../src", verbose = true) return end diff --git a/web/backend/test/src/jobs_test.jl b/web/backend/test/src/jobs_test.jl new file mode 100644 index 0000000..a38cdab --- /dev/null +++ b/web/backend/test/src/jobs_test.jl @@ -0,0 +1,43 @@ +# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment +# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved. +# Released under the modified BSD license. See COPYING.md for more details. + +using Backend +using Test +using HiGHS + +function jobs_test_usage() + @testset "JobProcessor" begin + # Setup job directory + job_id = "qwe123" + job_dir = joinpath(Backend.basedir, "jobs", job_id) + mkpath(job_dir) + cp(fixture("case14.json.gz"), joinpath(job_dir, "input.json.gz")) + + try + # Create processor with HiGHS optimizer + processor = JobProcessor(optimizer = HiGHS.Optimizer) + + # Start the worker + start(processor) + + # Push job to queue + put!(processor, job_id) + + # Wait until all jobs are processed + while isbusy(processor) + sleep(0.1) + end + + # Check that solution file exists + output_path = joinpath(job_dir, "output.json") + @test isfile(output_path) + + # Stop the worker + stop(processor) + finally + # Cleanup + rm(job_dir, recursive = true, force = true) + end + end +end diff --git a/web/backend/test/src/server_test.jl b/web/backend/test/src/server_test.jl new file mode 100644 index 0000000..895d7e5 --- /dev/null +++ b/web/backend/test/src/server_test.jl @@ -0,0 +1,44 @@ +# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment +# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved. +# Released under the modified BSD license. See COPYING.md for more details. + +const PORT = 32617 + +function server_test_usage() + server = Backend.start_server(PORT; optimizer = HiGHS.Optimizer) + + # Read the compressed fixture file + compressed_data = read(fixture("case14.json.gz")) + + # Submit test case + response = HTTP.post( + "http://localhost:$PORT/submit", + ["Content-Type" => "application/gzip"], + compressed_data, + ) + @test response.status == 200 + + # Check response + response_data = JSON.parse(String(response.body)) + @test haskey(response_data, "job_id") + job_id = response_data["job_id"] + @test length(job_id) == 16 + + # Wait for jobs to finish and stop server + sleep(0.1) + stop(server) + + # Verify the compressed file was saved correctly + job_dir = joinpath(Backend.basedir, "jobs", job_id) + saved_input_path = joinpath(job_dir, "input.json.gz") + saved_log_path = joinpath(job_dir, "output.log") + saved_output_path = joinpath(job_dir, "output.json") + @test isfile(saved_input_path) + @test isfile(saved_log_path) + @test isfile(saved_output_path) + saved_data = read(saved_input_path) + @test saved_data == compressed_data + + # Clean up: remove the job directory + # rm(job_dir, recursive=true) +end