diff --git a/web/backend/Project.toml b/web/backend/Project.toml index 00b09c8..46cacf7 100644 --- a/web/backend/Project.toml +++ b/web/backend/Project.toml @@ -5,13 +5,21 @@ authors = ["Alinson S. Xavier "] [deps] CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" +Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" UnitCommitment = "64606440-39ea-11e9-0f29-3303a1d3d877" [compat] CodecZlib = "0.7.8" +Dates = "1.11.0" +Distributed = "1.11.0" HTTP = "1.10.19" JSON = "0.21.4" +Logging = "1.11.0" +Printf = "1.11.0" Random = "1.11.0" diff --git a/web/backend/src/Backend.jl b/web/backend/src/Backend.jl index 438d1c7..7c00293 100644 --- a/web/backend/src/Backend.jl +++ b/web/backend/src/Backend.jl @@ -8,5 +8,6 @@ basedir = joinpath(dirname(@__FILE__), "..") include("jobs.jl") include("server.jl") +include("log.jl") end diff --git a/web/backend/src/jobs.jl b/web/backend/src/jobs.jl index 594af56..3be9154 100644 --- a/web/backend/src/jobs.jl +++ b/web/backend/src/jobs.jl @@ -2,18 +2,19 @@ # Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved. # Released under the modified BSD license. See COPYING.md for more details. +using Distributed import Base: put! Base.@kwdef mutable struct JobProcessor - pending::Channel{String} = Channel{String}(Inf) - processing::Channel{String} = Channel{String}(Inf) - shutdown::Channel{Bool} = Channel{Bool}(1) - worker_task::Union{Task,Nothing} = nothing - work_fn::Function + pending = RemoteChannel(() -> Channel{String}(Inf)) + processing = RemoteChannel(() -> Channel{String}(Inf)) + shutdown = RemoteChannel(() -> Channel{Bool}(1)) + worker_pid = nothing + monitor_task = nothing + work_fn = nothing end function Base.put!(processor::JobProcessor, job_id::String) - @info "New job received: $job_id" return put!(processor.pending, job_id) end @@ -21,46 +22,58 @@ function isbusy(processor::JobProcessor) return isready(processor.pending) || isready(processor.processing) end -function run!(processor::JobProcessor) +function worker_loop(pending, processing, shutdown, work_fn) + @info "Starting worker loop" while true # Check for shutdown signal - if isready(processor.shutdown) + if isready(shutdown) + @info "Shutdown signal received" break end # Wait for a job with timeout - if !isready(processor.pending) + if !isready(pending) sleep(0.1) continue end # Move job from pending to processing queue - job_id = take!(processor.pending) - put!(processor.processing, job_id) + job_id = take!(pending) + put!(processing, job_id) + @info "Job started: $job_id" # Run work function - processor.work_fn(job_id) + try + work_fn(job_id) + catch e + @error "Job failed: job $job_id" + end # Remove job from processing queue - take!(processor.processing) + take!(processing) + @info "Job finished: $job_id" end end function start(processor::JobProcessor) - processor.worker_task = Threads.@spawn run!(processor) + processor.monitor_task = @spawn begin + worker_loop( + processor.pending, + processor.processing, + processor.shutdown, + processor.work_fn, + ) + end return end function stop(processor::JobProcessor) - # Signal worker to stop put!(processor.shutdown, true) - - # Wait for worker to finish - if processor.worker_task !== nothing + if processor.monitor_task !== nothing try - wait(processor.worker_task) - catch - # Worker may have already exited + wait(processor.monitor_task) + catch e + @warn "Error waiting for worker task" exception=e end end return diff --git a/web/backend/src/log.jl b/web/backend/src/log.jl new file mode 100644 index 0000000..528c8da --- /dev/null +++ b/web/backend/src/log.jl @@ -0,0 +1,36 @@ +# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment +# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved. +# Released under the modified BSD license. See COPYING.md for more details. + +import Logging: min_enabled_level, shouldlog, handle_message +using Base.CoreLogging, Logging, Dates + +struct TimeLogger <: AbstractLogger end + +min_enabled_level(::TimeLogger) = CoreLogging.Info +shouldlog(logger::TimeLogger, level, _module, group, id) = true + +function handle_message( + logger::TimeLogger, + level, + message, + _module, + group, + id, + filepath, + line; + kwargs..., +) + current_time = Dates.format(now(), "yyyy-mm-dd HH:MM:SS.sss") + print("[$current_time] ") + println(message) + flush(stdout) + flush(stderr) + return Base.Libc.flush_cstdio() +end + +function setup_logger() + global_logger(TimeLogger()) + @spawn global_logger(TimeLogger()) + return +end diff --git a/web/backend/src/server.jl b/web/backend/src/server.jl index 70afc6a..8f7261c 100644 --- a/web/backend/src/server.jl +++ b/web/backend/src/server.jl @@ -107,7 +107,6 @@ function start_server(host, port; optimizer) end end catch e - @error "Failed job: $job_id" e open(log_filename, "a") do io println(io, "\nError: ", e) println(io, "\nStacktrace:") diff --git a/web/backend/test/Project.toml b/web/backend/test/Project.toml index c1c0ddf..d40bcc8 100644 --- a/web/backend/test/Project.toml +++ b/web/backend/test/Project.toml @@ -6,18 +6,18 @@ authors = ["Alinson S. Xavier "] [deps] Backend = "948642ed-e3f9-4642-9296-0f1eaf40c938" CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" HiGHS = "87dc4568-4c63-4d18-b0c0-bb2238e4078b" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" JuliaFormatter = "98e50ef6-434e-11e9-1051-2b60c6c9e899" -Revise = "295af30f-e4ad-537b-8983-00126c2a3abe" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [compat] CodecZlib = "0.7.8" +Distributed = "1.11.0" HTTP = "1.10.19" HiGHS = "1.20.1" JSON = "0.21.4" JuliaFormatter = "2.2.0" -Revise = "3.12.0" Test = "1.11.0" diff --git a/web/backend/test/src/BackendT.jl b/web/backend/test/src/BackendT.jl index 2e34442..e750e5d 100644 --- a/web/backend/test/src/BackendT.jl +++ b/web/backend/test/src/BackendT.jl @@ -4,6 +4,8 @@ module BackendT +using Distributed + using Test using HTTP using JSON @@ -22,6 +24,7 @@ function fixture(path::String)::String end function runtests() + Backend.setup_logger() @testset "UCJL Backend" begin server_test_usage() jobs_test_usage() diff --git a/web/backend/test/src/jobs_test.jl b/web/backend/test/src/jobs_test.jl index 58d5179..5930376 100644 --- a/web/backend/test/src/jobs_test.jl +++ b/web/backend/test/src/jobs_test.jl @@ -7,10 +7,14 @@ using Test function jobs_test_usage() @testset "JobProcessor" begin - # Define dummy work function - received_job_id = [] + # Create a temporary directory for test output + test_dir = mktempdir() + + # Define dummy work function that writes to a file + # Note: This function will be executed on a worker process function work_fn(job_id) - push!(received_job_id, job_id) + output_file = joinpath(test_dir, job_id * ".txt") + write(output_file, job_id) return end @@ -24,10 +28,16 @@ function jobs_test_usage() put!(processor, "test") # Wait for job to complete - sleep(0.1) + # Increased timeout to account for worker process startup + sleep(2) stop(processor) # Check that the work function was called with correct job_id - @test received_job_id[1] == "test" + output_file = joinpath(test_dir, "test.txt") + @test isfile(output_file) + @test read(output_file, String) == "test" + + # Clean up + rm(test_dir; recursive = true) end end diff --git a/web/backend/test/src/server_test.jl b/web/backend/test/src/server_test.jl index 8760bd0..b5e80d4 100644 --- a/web/backend/test/src/server_test.jl +++ b/web/backend/test/src/server_test.jl @@ -13,7 +13,7 @@ function server_test_usage() # Submit test case response = HTTP.post( - "http://$HOST:$PORT/submit", + "http://$HOST:$PORT/api/submit", ["Content-Type" => "application/gzip"], compressed_data, ) @@ -26,7 +26,7 @@ function server_test_usage() @test length(job_id) == 16 # Wait for jobs to finish - sleep(0.1) + sleep(5) while isbusy(server.processor) sleep(0.1) end @@ -43,7 +43,7 @@ function server_test_usage() @test saved_data == compressed_data # Query job information - view_response = HTTP.get("http://$HOST:$PORT/jobs/$job_id/view") + view_response = HTTP.get("http://$HOST:$PORT/api/jobs/$job_id/view") @test view_response.status == 200 # Check response