diff --git a/Project.toml b/Project.toml index 88aca27b..995839b0 100644 --- a/Project.toml +++ b/Project.toml @@ -26,5 +26,5 @@ Conda = "1" JSON = "0.18,0.19,0.20,0.21,1" MbedTLS = "0.5,0.6,0.7,1" SoftGlobalScope = "1" -ZMQ = "1" +ZMQ = "1.3" julia = "1.6" diff --git a/src/handlers.jl b/src/handlers.jl index 4e84f709..150f1307 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -189,6 +189,9 @@ function connect_request(socket, msg) end function shutdown_request(socket, msg) + # stop heartbeat thread by closing the context + close(zmq_proxy_context[]) + send_ipython(requests[], msg_reply(msg, "shutdown_reply", msg.content)) sleep(0.1) # short delay (like in ipykernel), to hopefully ensure shutdown_reply is sent diff --git a/src/heartbeat.jl b/src/heartbeat.jl index c0e58e4b..e11991fa 100644 --- a/src/heartbeat.jl +++ b/src/heartbeat.jl @@ -7,10 +7,13 @@ import Libdl const threadid = zeros(Int, 128) # sizeof(uv_thread_t) <= 8 on Linux, OSX, Win -const zmq_proxy = Ref(C_NULL) +const zmq_proxy_context = Ref{Context}() # entry point for new thread -function heartbeat_thread(sock::Ptr{Cvoid}) +function heartbeat_thread(heartbeat_addr::Cstring) + zmq_proxy_context[] = Context() + heartbeat = Socket(zmq_proxy_context[], ROUTER) + GC.@preserve heartbeat_addr bind(heartbeat, unsafe_string(heartbeat_addr)) @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 # julia automatically "adopts" this thread because # we entered a Julia cfunction. We then have to enable @@ -19,14 +22,20 @@ function heartbeat_thread(sock::Ptr{Cvoid}) # (see julia#47196) ccall(:jl_gc_safe_enter, Int8, ()) end - ccall(zmq_proxy[], Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), - sock, sock, C_NULL) - nothing + ret = ZMQ.lib.zmq_proxy(heartbeat, heartbeat, C_NULL) + @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 + # julia automatically "adopts" this thread because + # we entered a Julia cfunction. We then have to enable + # a GC "safe" region to prevent us from grabbing the + # GC lock with the call to zmq_proxy, which never returns. + # (see julia#47196) + ccall(:jl_gc_safe_leave, Int8, ()) + end + return ret end -function start_heartbeat(sock) - zmq_proxy[] = Libdl.dlsym(Libdl.dlopen(ZMQ.libzmq), :zmq_proxy) - heartbeat_c = @cfunction(heartbeat_thread, Cvoid, (Ptr{Cvoid},)) - ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Ptr{Cvoid}), - threadid, heartbeat_c, sock) +function start_heartbeat(heartbeat_addr) + heartbeat_c = @cfunction(heartbeat_thread, Cint, (Cstring,)) + ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Cstring), + threadid, heartbeat_c, heartbeat_addr) end diff --git a/src/init.jl b/src/init.jl index 231a33d7..c613c3f9 100644 --- a/src/init.jl +++ b/src/init.jl @@ -24,7 +24,6 @@ const publish = Ref{Socket}() const raw_input = Ref{Socket}() const requests = Ref{Socket}() const control = Ref{Socket}() -const heartbeat = Ref{Socket}() const profile = Dict{String,Any}() const read_stdout = Ref{Base.PipeEndpoint}() const read_stderr = Ref{Base.PipeEndpoint}() @@ -87,21 +86,19 @@ function init(args) raw_input[] = Socket(ROUTER) requests[] = Socket(ROUTER) control[] = Socket(ROUTER) - heartbeat[] = Socket(ROUTER) sep = profile["transport"]=="ipc" ? "-" : ":" bind(publish[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["iopub_port"])") bind(requests[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["shell_port"])") bind(control[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["control_port"])") bind(raw_input[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["stdin_port"])") - bind(heartbeat[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])") + start_heartbeat("$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])") # associate a lock with each socket so that multi-part messages # on a given socket don't get inter-mingled between tasks. - for s in (publish[], raw_input[], requests[], control[], heartbeat[]) + for s in (publish[], raw_input[], requests[], control[]) socket_locks[s] = ReentrantLock() end - start_heartbeat(heartbeat[]) if capture_stdout read_stdout[], = redirect_stdout() redirect_stdout(IJuliaStdio(stdout,"stdout"))