diff --git a/Gemfile b/Gemfile index 705e55eb..8858a2e2 100644 --- a/Gemfile +++ b/Gemfile @@ -18,4 +18,5 @@ group :test do gem "connection_pool", "~> 2.0" gem "rbnacl" gem "domain_name" + gem "websocket-client-simple" end diff --git a/lib/rage-rb.rb b/lib/rage-rb.rb index 748f8afd..5f9094c4 100644 --- a/lib/rage-rb.rb +++ b/lib/rage-rb.rb @@ -7,27 +7,17 @@ module Rage def self.application - app = Application.new(__router) - - config.middleware.middlewares.reverse.inject(app) do |next_in_chain, (middleware, args, block)| - # in Rails compatibility mode we first check if the middleware is a part of the Rails middleware stack; - # if it is - it is expected to be built using `ActionDispatch::MiddlewareStack::Middleware#build` - if Rage.config.internal.rails_mode - rails_middleware = Rails.application.config.middleware.middlewares.find { |m| m.name == middleware.name } - end - - if rails_middleware - rails_middleware.build(next_in_chain) - else - middleware.new(next_in_chain, *args, &block) - end - end + with_middlewares(Application.new(__router), config.middleware.middlewares) end def self.multi_application Rage::Router::Util::Cascade.new(application, Rails.application) end + def self.cable + Rage::Cable + end + def self.routes Rage::Router::DSL.new(__router) end @@ -90,6 +80,23 @@ def self.patch_active_record_connection_pool end end + # @private + def self.with_middlewares(app, middlewares) + middlewares.reverse.inject(app) do |next_in_chain, (middleware, args, block)| + # in Rails compatibility mode we first check if the middleware is a part of the Rails middleware stack; + # if it is - it is expected to be built using `ActionDispatch::MiddlewareStack::Middleware#build` + if Rage.config.internal.rails_mode + rails_middleware = Rails.application.config.middleware.middlewares.find { |m| m.name == middleware.name } + end + + if rails_middleware + rails_middleware.build(next_in_chain) + else + middleware.new(next_in_chain, *args, &block) + end + end + end + module Router module Strategies end @@ -106,6 +113,7 @@ module ActiveRecord autoload :Cookies, "rage/cookies" autoload :Session, "rage/session" + autoload :Cable, "rage/cable/cable" end module RageController diff --git a/lib/rage/all.rb b/lib/rage/all.rb index cf9fcd05..c3921b55 100644 --- a/lib/rage/all.rb +++ b/lib/rage/all.rb @@ -26,6 +26,7 @@ require_relative "logger/json_formatter" require_relative "logger/logger" +require_relative "middleware/origin_validator" require_relative "middleware/fiber_wrapper" require_relative "middleware/cors" require_relative "middleware/reloader" diff --git a/lib/rage/cable/cable.rb b/lib/rage/cable/cable.rb new file mode 100644 index 00000000..7326043f --- /dev/null +++ b/lib/rage/cable/cable.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +module Rage::Cable + # Create a new Cable application. + # + # @example + # map "/cable" do + # run Rage.cable.application + # end + def self.application + protocol = Rage.config.cable.protocol + protocol.init(__router) + + handler = __build_handler(protocol) + accept_response = [0, protocol.protocol_definition, []] + + application = ->(env) do + if env["rack.upgrade?"] == :websocket + env["rack.upgrade"] = handler + accept_response + else + [426, { "Connection" => "Upgrade", "Upgrade" => "websocket" }, []] + end + end + + Rage.with_middlewares(application, Rage.config.cable.middlewares) + end + + # @private + def self.__router + @__router ||= Router.new + end + + # @private + def self.__build_handler(protocol) + klass = Class.new do + def initialize(protocol) + Iodine.on_state(:on_start) do + unless Fiber.scheduler + Fiber.set_scheduler(Rage::FiberScheduler.new) + end + end + + @protocol = protocol + end + + def on_open(connection) + Fiber.schedule do + @protocol.on_open(connection) + rescue => e + log_error(e) + end + end + + def on_message(connection, data) + Fiber.schedule do + @protocol.on_message(connection, data) + rescue => e + log_error(e) + end + end + + if protocol.respond_to?(:on_close) + def on_close(connection) + return unless ::Iodine.running? + + Fiber.schedule do + @protocol.on_close(connection) + rescue => e + log_error(e) + end + end + end + + if protocol.respond_to?(:on_shutdown) + def on_shutdown(connection) + @protocol.on_shutdown(connection) + rescue => e + log_error(e) + end + end + + private + + def log_error(e) + Rage.logger.error("Unhandled exception has occured - #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") + end + end + + klass.new(protocol) + end + + # Broadcast data directly to a named stream. + # + # @param stream [String] the name of the stream + # @param data [Object] the object to send to the clients. This will later be encoded according to the protocol used. + # @example + # Rage.cable.broadcast("chat", { message: "A new member has joined!" }) + def self.broadcast(stream, data) + Rage.config.cable.protocol.broadcast(stream, data) + end + + # @!parse [ruby] + # # @abstract + # class WebSocketConnection + # # Write data to the connection. + # # + # # @param data [String] the data to write + # def write(data) + # end + # + # # Subscribe to a channel. + # # + # # @param name [String] the channel name + # def subscribe(name) + # end + # + # # Close the connection. + # def close + # end + # end + + module Protocol + end +end + +require_relative "protocol/actioncable_v1_json" +require_relative "channel" +require_relative "connection" +require_relative "router" diff --git a/lib/rage/cable/channel.rb b/lib/rage/cable/channel.rb new file mode 100644 index 00000000..5377578c --- /dev/null +++ b/lib/rage/cable/channel.rb @@ -0,0 +1,452 @@ +require "set" + +class Rage::Cable::Channel + # @private + INTERNAL_ACTIONS = [:subscribed, :unsubscribed] + + class << self + # @private + attr_reader :__prepared_actions + + # @private + attr_reader :__channels + + # @private + # returns a list of actions that can be called remotely + def __register_actions + actions = ( + public_instance_methods(true) - Rage::Cable::Channel.public_instance_methods(true) + ).reject { |m| m.start_with?("__rage_tmp") || m.start_with?("__run") } + + @__prepared_actions = (INTERNAL_ACTIONS + actions).each_with_object({}) do |action_name, memo| + memo[action_name] = __register_action_proc(action_name) + end + + actions - INTERNAL_ACTIONS + end + + # @private + # rubocop:disable Layout/HeredocIndentation, Layout/IndentationWidth, Layout/EndAlignment, Layout/ElseAlignment + def __register_action_proc(action_name) + if action_name == :subscribed && @__hooks + before_subscribe_chunk = if @__hooks[:before_subscribe] + lines = @__hooks[:before_subscribe].map do |h| + condition = if h[:if] && h[:unless] + "if #{h[:if]} && !#{h[:unless]}" + elsif h[:if] + "if #{h[:if]}" + elsif h[:unless] + "unless #{h[:unless]}" + end + + <<~RUBY + #{h[:name]} #{condition} + return if @__subscription_rejected + RUBY + end + + lines.join("\n") + end + + after_subscribe_chunk = if @__hooks[:after_subscribe] + lines = @__hooks[:after_subscribe].map do |h| + condition = if h[:if] && h[:unless] + "if #{h[:if]} && !#{h[:unless]}" + elsif h[:if] + "if #{h[:if]}" + elsif h[:unless] + "unless #{h[:unless]}" + end + + <<~RUBY + #{h[:name]} #{condition} + RUBY + end + + lines.join("\n") + end + end + + if action_name == :unsubscribed && @__hooks + before_unsubscribe_chunk = if @__hooks[:before_unsubscribe] + lines = @__hooks[:before_unsubscribe].map do |h| + condition = if h[:if] && h[:unless] + "if #{h[:if]} && !#{h[:unless]}" + elsif h[:if] + "if #{h[:if]}" + elsif h[:unless] + "unless #{h[:unless]}" + end + + <<~RUBY + #{h[:name]} #{condition} + RUBY + end + + lines.join("\n") + end + + after_unsubscribe_chunk = if @__hooks[:after_unsubscribe] + lines = @__hooks[:after_unsubscribe].map do |h| + condition = if h[:if] && h[:unless] + "if #{h[:if]} && !#{h[:unless]}" + elsif h[:if] + "if #{h[:if]}" + elsif h[:unless] + "unless #{h[:unless]}" + end + + <<~RUBY + #{h[:name]} #{condition} + RUBY + end + + lines.join("\n") + end + end + + rescue_handlers_chunk = if @__rescue_handlers + lines = @__rescue_handlers.map do |klasses, handler| + <<~RUBY + rescue #{klasses.join(", ")} => __e + #{instance_method(handler).arity == 0 ? handler : "#{handler}(__e)"} + RUBY + end + + lines.join("\n") + else + "" + end + + periodic_timers_chunk = if @__periodic_timers + set_up_periodic_timers + + if action_name == :subscribed + <<~RUBY + self.class.__channels << self unless subscription_rejected? + RUBY + elsif action_name == :unsubscribed + <<~RUBY + self.class.__channels.delete(self) + RUBY + end + else + "" + end + + is_subscribing = action_name == :subscribed + activerecord_loaded = defined?(::ActiveRecord) + + method_name = class_eval <<~RUBY, __FILE__, __LINE__ + 1 + def __run_#{action_name}(data) + #{if is_subscribing + <<~RUBY + @__is_subscribing = true + RUBY + end} + + #{before_subscribe_chunk} + #{before_unsubscribe_chunk} + + #{if instance_method(action_name).arity == 0 + <<~RUBY + #{action_name} + RUBY + else + <<~RUBY + #{action_name}(data) + RUBY + end} + + #{after_subscribe_chunk} + #{after_unsubscribe_chunk} + #{periodic_timers_chunk} + #{rescue_handlers_chunk} + + #{if activerecord_loaded + <<~RUBY + ensure + if ActiveRecord::Base.connection_pool.active_connection? + ActiveRecord::Base.connection_handler.clear_active_connections! + end + RUBY + end} + end + RUBY + + eval("->(channel, data) { channel.#{method_name}(data) }") + end + # rubocop:enable all + + # @private + def __prepare_id_method(method_name) + define_method(method_name) do + @__identified_by[method_name] + end + end + + # Register a new `before_subscribe` hook that will be called before the {subscribed} method. + # + # @example + # before_subscribe :my_method + # @example + # before_subscribe do + # ... + # end + # @example + # before_subscribe :my_method, if: -> { ... } + def before_subscribe(action_name = nil, **opts, &block) + add_action(:before_subscribe, action_name, **opts, &block) + end + + # Register a new `after_subscribe` hook that will be called after the {subscribed} method. + # + # @example + # after_subscribe do + # ... + # end + # @example + # after_subscribe :my_method, unless: :subscription_rejected? + # @note This callback will be triggered even if the subscription was rejected with the {reject} method. + def after_subscribe(action_name = nil, **opts, &block) + add_action(:after_subscribe, action_name, **opts, &block) + end + + # Register a new `before_unsubscribe` hook that will be called before the {unsubscribed} method. + def before_unsubscribe(action_name = nil, **opts, &block) + add_action(:before_unsubscribe, action_name, **opts, &block) + end + + # Register a new `after_unsubscribe` hook that will be called after the {unsubscribed} method. + def after_unsubscribe(action_name = nil, **opts, &block) + add_action(:after_unsubscribe, action_name, **opts, &block) + end + + # Register an exception handler. + # + # @param klasses [Class, Array] exception classes to watch on + # @param with [Symbol] the name of a handler method. The method can take one argument, which is the raised exception. Alternatively, you can pass a block, which can also take one argument. + # @example + # rescue_from StandardError, with: :report_error + # + # private + # + # def report_error(e) + # SomeExternalBugtrackingService.notify(e) + # end + # @example + # rescue_from StandardError do |e| + # SomeExternalBugtrackingService.notify(e) + # end + def rescue_from(*klasses, with: nil, &block) + unless with + if block_given? + with = define_tmp_method(block) + else + raise ArgumentError, "No handler provided. Pass the `with` keyword argument or provide a block." + end + end + + if @__rescue_handlers.nil? + @__rescue_handlers = [] + elsif @__rescue_handlers.frozen? + @__rescue_handlers = @__rescue_handlers.dup + end + + @__rescue_handlers.unshift([klasses, with]) + end + + # Set up a timer to periodically perform a task on the channel. Accepts a method name or a block. + # + # @param method_name [Symbol, nil] the name of the method to call + # @param every [Integer] the calling period in seconds + # @example + # periodically every: 3.minutes do + # transmit({ action: :update_count, count: current_count }) + # end + # @example + # periodically :update_count, every: 3.minutes + def periodically(method_name = nil, every:, &block) + callback_name = if block_given? + raise ArgumentError, "Pass the `method_name` argument or provide a block, not both" if method_name + define_tmp_method(block) + elsif method_name.is_a?(Symbol) + define_tmp_method(eval("-> { #{method_name} }")) + else + raise ArgumentError, "Expected a Symbol method name, got #{method_name.inspect}" + end + + unless every.is_a?(Numeric) && every > 0 + raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}" + end + + callback = eval("->(channel) { channel.#{callback_name} }") + + if @__periodic_timers.nil? + @__periodic_timers = [] + elsif @__periodic_timers.frozen? + @__periodic_timers = @__periodic_timers.dup + end + + @__periodic_timers << [callback, every] + end + + protected + + def set_up_periodic_timers + return if @__periodic_timers_set_up + + @__channels = Set.new + + @__periodic_timers.each do |callback, every| + ::Iodine.run_every((every * 1000).to_i) do + slice_length = (@__channels.length / 20.0).ceil + + if slice_length != 0 + @__channels.each_slice(slice_length) do |slice| + Fiber.schedule do + slice.each { |channel| callback.call(channel) } + rescue => e + Rage.logger.error("Unhandled exception has occured - #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") + end + end + end + end + end + + @__periodic_timers_set_up = true + end + + def add_action(action_type, action_name = nil, **opts, &block) + if block_given? + action_name = define_tmp_method(block) + elsif action_name.nil? + raise ArgumentError, "No handler provided. Pass the `action_name` parameter or provide a block." + end + + _if, _unless = opts.values_at(:if, :unless) + + action = { + name: action_name, + if: _if, + unless: _unless + } + + action[:if] = define_tmp_method(action[:if]) if action[:if].is_a?(Proc) + action[:unless] = define_tmp_method(action[:unless]) if action[:unless].is_a?(Proc) + + if @__hooks.nil? + @__hooks = {} + elsif @__hooks[action_type] && @__hooks.frozen? + @__hooks = @__hooks.dup + @__hooks[action_type] = @__hooks[action_type].dup + end + + if @__hooks[action_type].nil? + @__hooks[action_type] = [action] + elsif (i = @__hooks[action_type].find_index { |a| a[:name] == action_name }) + @__hooks[action_type][i] = action + else + @__hooks[action_type] << action + end + end + + attr_writer :__hooks, :__rescue_handlers, :__periodic_timers + + def inherited(klass) + klass.__hooks = @__hooks.freeze + klass.__rescue_handlers = @__rescue_handlers.freeze + klass.__periodic_timers = @__periodic_timers.freeze + end + + @@__tmp_name_seed = ("a".."i").to_a.permutation + + def define_tmp_method(block) + name = @@__tmp_name_seed.next.join + define_method("__rage_tmp_#{name}", block) + end + end # class << self + + # @private + def __has_action?(action_name) + !INTERNAL_ACTIONS.include?(action_name) && self.class.__prepared_actions.has_key?(action_name) + end + + # @private + def __run_action(action_name, data = nil) + self.class.__prepared_actions[action_name].call(self, data) + end + + # @private + def initialize(connection, params, identified_by) + @__connection = connection + @__params = params + @__identified_by = identified_by + end + + # Get the params hash passed in during the subscription process. + # + # @return [Hash{Symbol=>String,Array,Hash,Numeric,NilClass,TrueClass,FalseClass}] + def params + @__params + end + + # Reject the subscription request. The method should only be called during the subscription + # process (i.e. inside the {subscribed} method or {before_subscribe}/{after_subscribe} hooks). + def reject + @__subscription_rejected = true + end + + # Checks whether the {reject} method has been called. + # + # @return [Boolean] + def subscription_rejected? + !!@__subscription_rejected + end + + # Subscribe to a stream. + # + # @param stream [String] the name of the stream + def stream_from(stream) + Rage.config.cable.protocol.subscribe(@__connection, stream, @__params) + end + + # Broadcast data to all the clients subscribed to a stream. + # + # @param stream [String] the name of the stream + # @param data [Object] the data to send to the clients + # @example + # def subscribed + # broadcast("notifications", { message: "A new member has joined!" }) + # end + def broadcast(stream, data) + Rage.config.cable.protocol.broadcast(stream, data) + end + + # Transmit data to the current client. + # + # @param data [Object] the data to send to the client + # @example + # def subscribed + # transmit({ message: "Hello!" }) + # end + def transmit(data) + message = Rage.config.cable.protocol.serialize(@__params, data) + + if @__is_subscribing + # we expect a confirmation message to be sent as a result of a successful subscribe call; + # this will make sure `transmit` calls send data after the confirmation; + ::Iodine.defer { @__connection.write(message) } + else + @__connection.write(message) + end + end + + # Called once a client has become a subscriber of the channel. + def subscribed + end + + # Called once a client unsubscribes from the channel. + def unsubscribed + end +end diff --git a/lib/rage/cable/connection.rb b/lib/rage/cable/connection.rb new file mode 100644 index 00000000..bb96c2a2 --- /dev/null +++ b/lib/rage/cable/connection.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +class Rage::Cable::Connection + # @private + attr_reader :__identified_by_map + + # Mark a key as being a connection identifier index that can then be used to find the specific connection again later. + # Common identifiers are `current_user` and `current_account`, but could be anything. + # + # @param identifiers [Symbol,Array] + def self.identified_by(*identifiers) + identifiers.each do |method_name| + define_method(method_name) do + @__identified_by_map[method_name] + end + + define_method("#{method_name}=") do |data| + @__identified_by_map[method_name] = data + end + + Rage::Cable::Channel.__prepare_id_method(method_name) + end + end + + # @private + def initialize(env, identified_by = {}) + @__env = env + @__identified_by_map = identified_by + end + + # @private + def connect + end + + # Reject the WebSocket connection. + def reject_unauthorized_connection + @rejected = true + end + + def rejected? + !!@rejected + end + + # Get the request object. See {Rage::Request}. + # + # @return [Rage::Request] + def request + @__request ||= Rage::Request.new(@__env) + end + + # Get the cookie object. See {Rage::Cookies}. + # + # @return [Rage::Cookies] + def cookies + @__cookies ||= Rage::Cookies.new(@__env, ReadOnlyHash.new) + end + + # Get the session object. See {Rage::Session}. + # + # @return [Rage::Session] + def session + @__session ||= Rage::Session.new(cookies) + end + + # Get URL query parameters. + # + # @return [Hash{Symbol=>String,Array,Hash}] + def params + @__params ||= Iodine::Rack::Utils.parse_nested_query(@__env["QUERY_STRING"]) + end + + # @private + class ReadOnlyHash < Hash + def []=(_, _) + raise "Cookies cannot be set for WebSocket clients" + end + end +end diff --git a/lib/rage/cable/protocol/actioncable_v1_json.rb b/lib/rage/cable/protocol/actioncable_v1_json.rb new file mode 100644 index 00000000..5b6630b7 --- /dev/null +++ b/lib/rage/cable/protocol/actioncable_v1_json.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +## +# A protocol defines the structure, rules and semantics for exchanging data between the client and the server. +# The class that defines a protocol should respond to the following methods: +# +# * `protocol_definition` +# * `init` +# * `on_open` +# * `on_message` +# * `serialize` +# * `subscribe` +# * `broadcast` +# +# The two optional methods are: +# +# * `on_shutdown` +# * `on_close` +# +class Rage::Cable::Protocol::ActioncableV1Json + module TYPE + WELCOME = "welcome" + DISCONNECT = "disconnect" + PING = "ping" + CONFIRM = "confirm_subscription" + REJECT = "reject_subscription" + end + + module REASON + UNAUTHORIZED = "unauthorized" + INVALID = "invalid_request" + end + + module COMMAND + SUBSCRIBE = "subscribe" + MESSAGE = "message" + end + + module MESSAGES + WELCOME = { type: TYPE::WELCOME }.to_json + UNAUTHORIZED = { type: TYPE::DISCONNECT, reason: REASON::UNAUTHORIZED, reconnect: false }.to_json + INVALID = { type: TYPE::DISCONNECT, reason: REASON::INVALID, reconnect: true }.to_json + end + + HANDSHAKE_HEADERS = { "Sec-WebSocket-Protocol" => "actioncable-v1-json" } + + # The method defines the headers to send to the client after the handshake process. + def self.protocol_definition + HANDSHAKE_HEADERS + end + + # This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats). + # + # @param router [Rage::Cable::Router] + def self.init(router) + @router = router + + ping_counter = Time.now.to_i + ::Iodine.run_every(3000) do + ping_counter += 1 + ::Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json) + end + + # Hash Array(subscription params)> + @subscription_identifiers = Hash.new { |hash, key| hash[key] = [] } + end + + # The method is called any time a new WebSocket connection is established. + # It is expected to call {Rage::Cable::Router#process_connection} and handle its return value. + # + # @param connection [Rage::Cable::WebSocketConnection] the connection object + # @see Rage::Cable::Router + def self.on_open(connection) + accepted = @router.process_connection(connection) + + if accepted + connection.subscribe("cable:ping") + connection.write(MESSAGES::WELCOME) + else + connection.write(MESSAGES::UNAUTHORIZED) + connection.close + end + end + + # The method processes messages from existing connections. It should parse the message, call either + # {Rage::Cable::Router#process_subscription} or {Rage::Cable::Router#process_message}, and handle its return value. + # + # @param connection [Rage::Cable::WebSocketConnection] the connection object + # @param raw_data [String] the message body + # @see Rage::Cable::Router + def self.on_message(connection, raw_data) + parsed_data = Rage::ParamsParser.json_parse(raw_data) + + command, identifier = parsed_data[:command], parsed_data[:identifier] + params = Rage::ParamsParser.json_parse(identifier) + + # process subscription messages + if command == COMMAND::SUBSCRIBE + status = @router.process_subscription(connection, identifier, params[:channel], params) + if status == :subscribed + connection.write({ identifier: identifier, type: TYPE::CONFIRM }.to_json) + elsif status == :rejected + connection.write({ identifier: identifier, type: TYPE::REJECT }.to_json) + elsif status == :invalid + connection.write(MESSAGES::INVALID) + end + + return + end + + # process data messages; + # plain `JSON` is used here to conform with the ActionCable API that passes `data` as a Hash with string keys; + data = JSON.parse(parsed_data[:data]) + + message_status = if command == COMMAND::MESSAGE && data.has_key?("action") + @router.process_message(connection, identifier, data["action"].to_sym, data) + + elsif command == COMMAND::MESSAGE + @router.process_message(connection, identifier, :receive, data) + end + + unless message_status == :processed + connection.write(MESSAGES::INVALID) + end + end + + # The method should process client disconnections and call {Rage::Cable::Router#process_message}. + # + # @note This method is optional. + # @param connection [Rage::Cable::WebSocketConnection] the connection object + # @see Rage::Cable::Router + def self.on_close(connection) + @router.process_disconnection(connection) + end + + # Serialize a Ruby object into the format the client would understand. + # + # @param params [Hash] parameters associated with the client + # @param data [Object] the object to serialize + def self.serialize(params, data) + { identifier: params.to_json, message: data }.to_json + end + + # Subscribe to a stream. + # + # @param connection [Rage::Cable::WebSocketConnection] the connection object + # @param name [String] the stream name + # @param params [Hash] parameters associated with the client + def self.subscribe(connection, name, params) + connection.subscribe("cable:#{name}:#{params.hash}") + @subscription_identifiers[name] << params unless @subscription_identifiers[name].include?(params) + end + + # Broadcast data to all clients connected to a stream. + # + # @param name [String] the stream name + # @param data [Object] the data to send + def self.broadcast(name, data) + i, identifiers = 0, @subscription_identifiers[name] + + while i < identifiers.length + params = identifiers[i] + ::Iodine.publish("cable:#{name}:#{params.hash}", serialize(params, data)) + i += 1 + end + end +end diff --git a/lib/rage/cable/router.rb b/lib/rage/cable/router.rb new file mode 100644 index 00000000..c83c6644 --- /dev/null +++ b/lib/rage/cable/router.rb @@ -0,0 +1,138 @@ +# frozen_string_literal: true + +class Rage::Cable::Router + # @private + def initialize + # Hash Proc(new channel instance)> + @channels_map = {} + init_connection_class + end + + # Calls the `connect` method on the `Connection` class to handle authentication. + # + # @param connection [Rage::Cable::WebSocketConnection] the connection object + # @return [true] if the connection was accepted + # @return [false] if the connection was rejected + def process_connection(connection) + cable_connection = @connection_class.new(connection.env) + cable_connection.connect + + if cable_connection.rejected? + Rage.logger.debug { "An unauthorized connection attempt was rejected" } + else + connection.env["rage.identified_by"] = cable_connection.__identified_by_map + connection.env["rage.cable"] = {} + end + + !cable_connection.rejected? + end + + # Calls the `subscribed` method on the specified channel. + # + # @param connection [Rage::Cable::WebSocketConnection] the connection object + # @param identifier [String] the identifier of the subscription + # @param channel_name [String] the name of the channel class + # @param params [Hash] the params hash associated with the subscription + # + # @return [:invalid] if the subscription class does not exist + # @return [:rejected] if the subscription was rejected + # @return [:subscribed] if the subscription was accepted + def process_subscription(connection, identifier, channel_name, params) + channel_class = @channels_map[channel_name] || begin + begin + klass = Object.const_get(channel_name) + rescue NameError + nil + end + + if klass.nil? || !klass.ancestors.include?(Rage::Cable::Channel) + Rage.logger.debug { "Subscription class not found: #{channel_name}" } + return :invalid + end + + klass.__register_actions.tap do |available_actions| + Rage.logger.debug { "Compiled #{channel_name}. Available remote actions: #{available_actions}." } + end + + @channels_map[channel_name] = klass + end + + channel = channel_class.new(connection, params, connection.env["rage.identified_by"]) + channel.__run_action(:subscribed) + + if channel.subscription_rejected? + Rage.logger.debug { "#{channel_name} is transmitting the subscription rejection" } + # if the subscription is rejected in the `subscribed` method, ActionCable will additionally run + # the `unsubscribed` method; this makes little sense to me as the client was never subscribed in + # the first place; additionally, I don't think this behaviour is documented anywhere; + # so, I'm going to leave this line commented out for now; + # channel.__run_action(:unsubscribed) + :rejected + else + Rage.logger.debug { "#{channel_name} is transmitting the subscription confirmation" } + connection.env["rage.cable"][identifier] = channel + :subscribed + end + end + + # Calls the handler method on the specified channel. + # + # @param connection [Rage::Cable::WebSocketConnection] the connection object + # @param identifier [String] the identifier of the subscription + # @param action_name [Symbol] the name of the handler method + # @param data [Object] the data sent by the client + # + # @return [:no_subscription] if the client is not subscribed to the specified channel + # @return [:unknown_action] if the action does not exist on the specified channel + # @return [:processed] if the message has been successfully processed + def process_message(connection, identifier, action_name, data) + channel = connection.env["rage.cable"][identifier] + unless channel + Rage.logger.debug { "Unable to find the subscription" } + return :no_subscription + end + + if channel.__has_action?(action_name) + channel.__run_action(action_name, data) + :processed + else + Rage.logger.debug { "Unable to process #{channel.class.name}##{action_name}" } + :unknown_action + end + end + + # Runs the `unsubscribed` methods on all the channels the client is subscribed to. + # + # @param connection [Rage::Cable::WebSocketConnection] the connection object + def process_disconnection(connection) + connection.env["rage.cable"]&.each do |_, channel| + channel.__run_action(:unsubscribed) + end + + if @connection_can_disconnect + cable_connection = @connection_class.new(connection.env, connection.env["rage.identified_by"]) + cable_connection.disconnect + end + end + + # @private + def reset + @channels_map.clear + init_connection_class + end + + private + + def init_connection_class + @connection_class = if Object.const_defined?("RageCable::Connection") + RageCable::Connection + elsif Object.const_defined?("ApplicationCable::Connection") + ApplicationCable::Connection + else + puts "WARNING: Could not find the RageCable connection class! All connections will be accepted by default." + Rage::Cable::Connection + end + + @connection_can_disconnect = @connection_class.method_defined?(:disconnect) + end +end diff --git a/lib/rage/code_loader.rb b/lib/rage/code_loader.rb index fcb1a8bc..c89a5127 100644 --- a/lib/rage/code_loader.rb +++ b/lib/rage/code_loader.rb @@ -30,8 +30,13 @@ def reload @reloading = true @loader.reload + Rage.__router.reset_routes load("#{Rage.root}/config/routes.rb") + + unless Rage.autoload?(:Cable) # the `Cable` component is loaded + Rage::Cable.__router.reset + end end # in Rails mode - reset the routes; everything else will be done by Rails @@ -40,6 +45,10 @@ def rails_mode_reload @reloading = true Rage.__router.reset_routes + + unless Rage.autoload?(:Cable) # the `Cable` component is loaded + Rage::Cable.__router.reset + end end def reloading? diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index affbed55..e91978c4 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -93,6 +93,20 @@ # # > Specifies connection timeout. # +# # Cable Configuration +# +# • _config.cable.protocol_ +# +# > Specifies the protocol the server will use. The only value currently supported is `Rage::Cable::Protocol::ActioncableV1Json`. The client application will need to use [@rails/actioncable](https://www.npmjs.com/package/@rails/actioncable) to talk to the server. +# +# • _config.cable.allowed_request_origins_ +# +# > Restricts the server to only accept requests from specified origins. The origins can be instances of strings or regular expressions, against which a check for the match will be performed. +# +# • _config.cable.disable_request_forgery_protection_ +# +# > Allows requests from any origin. +# # # Transient Settings # # The settings described in this section should be configured using **environment variables** and are either temporary or will become the default in the future. @@ -138,6 +152,10 @@ def middleware @middleware ||= Middleware.new end + def cable + @cable ||= Cable.new + end + def internal @internal ||= Internal.new end @@ -193,6 +211,32 @@ def find_middleware_index(middleware) end end + class Cable + attr_accessor :protocol, :allowed_request_origins, :disable_request_forgery_protection + + def initialize + @protocol = Rage::Cable::Protocol::ActioncableV1Json + @allowed_request_origins = if Rage.env.development? || Rage.env.test? + /localhost/ + end + end + + # @private + def middlewares + @middlewares ||= begin + origin_middleware = if @disable_request_forgery_protection + [] + else + [[Rage::OriginValidator, Array(@allowed_request_origins), nil]] + end + + origin_middleware + Rage.config.middleware.middlewares.reject do |middleware, _, _| + middleware == Rage::FiberWrapper + end + end + end + end + # @private class Internal attr_accessor :rails_mode diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index e28f8067..9894fe1b 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -365,13 +365,13 @@ def response # Get the cookie object. See {Rage::Cookies}. # @return [Rage::Cookies] def cookies - @cookies ||= Rage::Cookies.new(@__env, self) + @cookies ||= Rage::Cookies.new(@__env, @__headers) end # Get the session object. See {Rage::Session}. # @return [Rage::Session] def session - @session ||= Rage::Session.new(self) + @session ||= Rage::Session.new(cookies) end # Send a response to the client. diff --git a/lib/rage/cookies.rb b/lib/rage/cookies.rb index 70772531..5398b8c5 100644 --- a/lib/rage/cookies.rb +++ b/lib/rage/cookies.rb @@ -14,9 +14,9 @@ class Rage::Cookies # @private - def initialize(env, controller) + def initialize(env, headers) @env = env - @headers = controller.headers + @headers = headers @request_cookies = {} @parsed = false diff --git a/lib/rage/middleware/fiber_wrapper.rb b/lib/rage/middleware/fiber_wrapper.rb index 2f0c4fed..50538cce 100644 --- a/lib/rage/middleware/fiber_wrapper.rb +++ b/lib/rage/middleware/fiber_wrapper.rb @@ -7,7 +7,9 @@ class Rage::FiberWrapper def initialize(app) Iodine.on_state(:on_start) do - Fiber.set_scheduler(Rage::FiberScheduler.new) + unless Fiber.scheduler + Fiber.set_scheduler(Rage::FiberScheduler.new) + end end @app = app end diff --git a/lib/rage/middleware/origin_validator.rb b/lib/rage/middleware/origin_validator.rb new file mode 100644 index 00000000..03e193c1 --- /dev/null +++ b/lib/rage/middleware/origin_validator.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +class Rage::OriginValidator + def initialize(app, *allowed_origins) + @app = app + @validator = build_validator(allowed_origins) + end + + def call(env) + if @validator.call(env) + @app.call(env) + else + Rage.logger.error("Request origin not allowed: #{env["HTTP_ORIGIN"]}") + [404, {}, ["Not Found"]] + end + end + + private + + def build_validator(allowed_origins) + if allowed_origins.empty? + ->(env) { false } + else + origins_eval = allowed_origins.map { |origin| + origin.is_a?(Regexp) ? + "origin =~ /#{origin.source}/.freeze" : + "origin == '#{origin}'.freeze" + }.join(" || ") + + eval <<-RUBY + ->(env) do + origin = env["HTTP_ORIGIN".freeze] + #{origins_eval} + end + RUBY + end + end +end diff --git a/lib/rage/session.rb b/lib/rage/session.rb index 4cdec10b..23667b61 100644 --- a/lib/rage/session.rb +++ b/lib/rage/session.rb @@ -7,8 +7,8 @@ class Rage::Session KEY = Rack::RACK_SESSION.to_sym # @private - def initialize(controller) - @cookies = controller.cookies.encrypted + def initialize(cookies) + @cookies = cookies.encrypted end # Writes the value to the session. diff --git a/spec/cable/channel/actions_spec.rb b/spec/cable/channel/actions_spec.rb new file mode 100644 index 00000000..89b08e16 --- /dev/null +++ b/spec/cable/channel/actions_spec.rb @@ -0,0 +1,150 @@ +# frozen_string_literal: true + +module CableChannelActionsSpec + class TestChannel < Rage::Cable::Channel + end + + class TestChannel1 < Rage::Cable::Channel + def receive + end + end + + class TestChannel2 < Rage::Cable::Channel + def subscribed + end + + def receive + end + + def unsubscribed + end + end + + class TestChannel3 < TestChannel2 + def connect + end + end + + class TestChannel4 < Rage::Cable::Channel + protected def protected_action + end + + private def private_action + end + end + + class TestChannel5 < Rage::Cable::Channel + before_subscribe do + verifier.before_subscribe + end + + after_subscribe do + verifier.after_subscribe + end + + def receive + verifier.receive + end + end +end + +RSpec.describe Rage::Cable::Channel do + let(:klass_instance) { klass.tap(&:__register_actions).new(nil, nil, nil) } + + context "with default actions" do + let(:klass) { CableChannelActionsSpec::TestChannel } + + it "correctly registers actions" do + expect(klass.__register_actions).to be_empty + end + + describe "#__has_action?" do + it "correctly fetches available actions" do + expect(klass_instance.__has_action?(:subscribed)).to be(false) + expect(klass_instance.__has_action?(:unsubscribed)).to be(false) + end + end + end + + context "with a custom action" do + let(:klass) { CableChannelActionsSpec::TestChannel1 } + + it "correctly registers actions" do + expect(klass.__register_actions).to eq(%i(receive)) + end + + describe "#__has_action?" do + it "correctly fetches available actions" do + expect(klass_instance.__has_action?(:receive)).to be(true) + end + end + end + + context "with a custom action and subscription callbacks" do + let(:klass) { CableChannelActionsSpec::TestChannel2 } + + it "correctly registers actions" do + expect(klass.__register_actions).to eq(%i(receive)) + end + + describe "#__has_action?" do + it "correctly fetches available actions" do + expect(klass_instance.__has_action?(:receive)).to be(true) + end + end + end + + context "with inheritance" do + let(:klass) { CableChannelActionsSpec::TestChannel3 } + + it "correctly registers actions" do + expect(klass.__register_actions).to match_array(%i(receive connect)) + end + + describe "#__has_action?" do + it "correctly fetches available actions" do + expect(klass_instance.__has_action?(:receive)).to be(true) + expect(klass_instance.__has_action?(:connect)).to be(true) + end + end + end + + context "with protected and private methods" do + let(:klass) { CableChannelActionsSpec::TestChannel4 } + + it "correctly registers actions" do + expect(klass.__register_actions).to be_empty + end + end + + context "with before/after subscribe and custom action" do + let(:klass) { CableChannelActionsSpec::TestChannel5 } + + it "correctly registers actions" do + expect(klass.__register_actions).to eq([:receive]) + end + + describe "#__has_action?" do + it "correctly fetches available actions" do + expect(klass_instance.__has_action?(:receive)).to be(true) + end + end + + describe "#__run_action" do + let(:verifier) { double } + + before do + allow_any_instance_of(Rage::Cable::Channel).to receive(:verifier).and_return(verifier) + klass.__register_actions + end + + it "doesn't call before/after subscribe when calling custom action" do + expect(verifier).to receive(:receive).once + expect(verifier).not_to receive(:before_subscribe) + expect(verifier).not_to receive(:after_subscribe) + + klass_instance.__run_action(:receive) + end + end + end +end diff --git a/spec/cable/channel/attributes_spec.rb b/spec/cable/channel/attributes_spec.rb new file mode 100644 index 00000000..c8a5bbaa --- /dev/null +++ b/spec/cable/channel/attributes_spec.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +RSpec.describe Rage::Cable::Channel do + describe "#params" do + subject { described_class.new(nil, :test_params, nil) } + + it "correctly returns params" do + expect(subject.params).to eq(:test_params) + end + end + + describe "#subscription_rejected?" do + subject { described_class.new(nil, nil, nil) } + + it "does not reject by default" do + expect(subject).not_to be_subscription_rejected + end + + it "correctly rejects subscription" do + subject.reject + expect(subject).to be_subscription_rejected + end + end +end diff --git a/spec/cable/channel/data_spec.rb b/spec/cable/channel/data_spec.rb new file mode 100644 index 00000000..ba530ae2 --- /dev/null +++ b/spec/cable/channel/data_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +module CableChannelDataSpec + class TestChannel < Rage::Cable::Channel + def receive + verifier.receive + end + end + + class TestChannel2 < Rage::Cable::Channel + def receive(data) + verifier.receive(data) + end + end +end + +RSpec.describe Rage::Cable::Channel do + subject { klass.tap(&:__register_actions).new(nil, nil, nil) } + + let(:verifier) { double } + + before do + allow_any_instance_of(Rage::Cable::Channel).to receive(:verifier).and_return(verifier) + end + + context "expecting no data" do + let(:klass) { CableChannelDataSpec::TestChannel } + + it "correctly processes remote method calls with no data" do + expect(verifier).to receive(:receive).once + subject.__run_action(:receive) + end + + it "correctly processes remote method calls with data" do + expect(verifier).to receive(:receive).once + subject.__run_action(:receive, :test_data) + end + end + + context "expecting data" do + let(:klass) { CableChannelDataSpec::TestChannel2 } + + it "correctly processes remote method calls" do + expect(verifier).to receive(:receive).with(:test_data).once + subject.__run_action(:receive, :test_data) + end + + it "doesn't cache data" do + expect(verifier).to receive(:receive).with(:test_data).once + expect(verifier).to receive(:receive).with(:another_test_data).once + + subject.__run_action(:receive, :test_data) + subject.__run_action(:receive, :another_test_data) + end + end +end diff --git a/spec/cable/channel/identified_by_spec.rb b/spec/cable/channel/identified_by_spec.rb new file mode 100644 index 00000000..84d9c874 --- /dev/null +++ b/spec/cable/channel/identified_by_spec.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module CableChannelIdentifiedBySpec + class TestChannel < Rage::Cable::Channel + end +end + +RSpec.describe Rage::Cable::Channel do + subject { CableChannelIdentifiedBySpec::TestChannel.new(nil, nil, identified_by) } + + let(:identified_by) { { current_user: :test_user, current_account: :test_account } } + + before do + CableChannelIdentifiedBySpec::TestChannel.__prepare_id_method(:current_user) + CableChannelIdentifiedBySpec::TestChannel.__prepare_id_method(:current_account) + end + + it "correctly delegates identified_by methods" do + expect(subject.current_user).to eq(:test_user) + expect(subject.current_account).to eq(:test_account) + end +end diff --git a/spec/cable/channel/rescue_from_spec.rb b/spec/cable/channel/rescue_from_spec.rb new file mode 100644 index 00000000..9e7d0ed3 --- /dev/null +++ b/spec/cable/channel/rescue_from_spec.rb @@ -0,0 +1,251 @@ +# frozen_string_literal: true + +module CableChannelRescueFromSpec + class TestChannel < Rage::Cable::Channel + rescue_from ZeroDivisionError do + verifier.rescue_from + end + + def subscribed + raise ZeroDivisionError + end + end + + class TestChannel2 < Rage::Cable::Channel + rescue_from ZeroDivisionError do + verifier.rescue_from + end + + def subscribed + raise NameError + end + end + + class TestChannel3 < Rage::Cable::Channel + rescue_from NameError do + verifier.rescue_from_name_error + end + + rescue_from ZeroDivisionError do + verifier.rescue_from_zero_error + end + + def subscribed + raise ZeroDivisionError + end + end + + class TestChannel4 < TestChannel3 + def subscribed + raise NameError + end + end + + class TestChannel5 < Rage::Cable::Channel + rescue_from NameError, ZeroDivisionError do + verifier.rescue_from + end + + def subscribed + raise ZeroDivisionError + end + end + + class TestChannel6 < Rage::Cable::Channel + rescue_from ZeroDivisionError, with: :process_exception + + def subscribed + raise ZeroDivisionError + end + + private def process_exception + verifier.rescue_from + end + end + + class TestChannel7 < Rage::Cable::Channel + rescue_from StandardError do + verifier.rescue_from + end + + def subscribed + raise ZeroDivisionError + end + end + + class TestChannel8 < Rage::Cable::Channel + rescue_from ZeroDivisionError do + verifier.rescue_from + end + + def receive + raise ZeroDivisionError + end + end + + class TestChannel9 < Rage::Cable::Channel + rescue_from ZeroDivisionError do + verifier.rescue_from_zero_error + end + + rescue_from StandardError do + verifier.rescue_from_standard_error + end + + def subscribed + raise ZeroDivisionError + end + end + + class TestChannel10 < Rage::Cable::Channel + rescue_from ZeroDivisionError do + verifier.rescue_from + end + + before_subscribe do + raise ZeroDivisionError + end + end + + class TestChannel11 < Rage::Cable::Channel + rescue_from ZeroDivisionError do |exception| + verifier.rescue_from(exception) + end + + def subscribed + raise ZeroDivisionError + end + end + + class TestChannel12 < Rage::Cable::Channel + rescue_from ZeroDivisionError, with: :process_exception + + def subscribed + raise ZeroDivisionError + end + + private def process_exception(exception) + verifier.rescue_from(exception) + end + end +end + +RSpec.describe Rage::Cable::Channel do + subject { klass.tap(&:__register_actions).new(nil, nil, nil).__run_action(:subscribed) } + + let(:verifier) { double } + + before do + allow_any_instance_of(Rage::Cable::Channel).to receive(:verifier).and_return(verifier) + end + + context "with rescue_from" do + let(:klass) { CableChannelRescueFromSpec::TestChannel } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).once + subject + end + end + + context "with rescue_from and unexpected error" do + let(:klass) { CableChannelRescueFromSpec::TestChannel2 } + + it "correctly processes exceptions" do + expect(verifier).not_to receive(:rescue_from_zero_error) + expect { subject }.to raise_error(NameError) + end + end + + context "with multiple rescue_from" do + let(:klass) { CableChannelRescueFromSpec::TestChannel3 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from_zero_error).once + subject + end + end + + context "with inheritance" do + let(:klass) { CableChannelRescueFromSpec::TestChannel4 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from_name_error).once + subject + end + end + + context "with rescue_from with multiple error classes" do + let(:klass) { CableChannelRescueFromSpec::TestChannel5 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).once + subject + end + end + + context "with a method handler" do + let(:klass) { CableChannelRescueFromSpec::TestChannel6 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).once + subject + end + end + + context "with StandardError" do + let(:klass) { CableChannelRescueFromSpec::TestChannel7 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).once + subject + end + end + + context "with custom action" do + subject { klass.tap(&:__register_actions).new(nil, nil, nil).__run_action(:receive) } + + let(:klass) { CableChannelRescueFromSpec::TestChannel8 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).once + subject + end + end + + context "with multiple rescue_from matching the error" do + let(:klass) { CableChannelRescueFromSpec::TestChannel9 } + + it "matches handlers from bottom to top" do + expect(verifier).to receive(:rescue_from_standard_error).once + subject + end + end + + context "with error in before_subscribe" do + let(:klass) { CableChannelRescueFromSpec::TestChannel10 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).once + subject + end + end + + context "with block handler accepting the exception" do + let(:klass) { CableChannelRescueFromSpec::TestChannel11 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).with(ZeroDivisionError).once + subject + end + end + + context "with method handler accepting the exception" do + let(:klass) { CableChannelRescueFromSpec::TestChannel12 } + + it "correctly processes exceptions" do + expect(verifier).to receive(:rescue_from).with(ZeroDivisionError).once + subject + end + end +end diff --git a/spec/cable/channel/subscribe_spec.rb b/spec/cable/channel/subscribe_spec.rb new file mode 100644 index 00000000..bf94c72c --- /dev/null +++ b/spec/cable/channel/subscribe_spec.rb @@ -0,0 +1,331 @@ +# frozen_string_literal: true + +module CableChannelSubscribeSpec + class TestChannel < Rage::Cable::Channel + def subscribed + verifier.subscribed + end + end + + class TestChannel2 < Rage::Cable::Channel + before_subscribe do + verifier.before_subscribe + end + + def subscribed + verifier.subscribed + end + end + + class TestChannel3 < Rage::Cable::Channel + before_subscribe do + verifier.before_subscribe + end + + after_subscribe :verify_after_subscribe + + def subscribed + verifier.subscribed + end + + private def verify_after_subscribe + verifier.after_subscribe + end + end + + class TestChannel4 < Rage::Cable::Channel + before_subscribe do + verifier.before_subscribe + end + + after_subscribe do + verifier.after_subscribe + end + + def subscribed + reject + end + end + + class TestChannel5 < Rage::Cable::Channel + before_subscribe do + reject + end + + after_subscribe do + verifier.after_subscribe + end + + def subscribed + verifier.subscribed + end + end + + class TestChannel6 < Rage::Cable::Channel + after_subscribe :verify_after_subscribe + + def subscribed + reject + end + + private def verify_after_subscribe + verifier.after_subscribe + end + end + + class TestChannel7 < Rage::Cable::Channel + after_subscribe :verify_after_subscribe, unless: :subscription_rejected? + + def subscribed + reject + end + + private def verify_after_subscribe + verifier.after_subscribe + end + end + + class TestChannel8 < Rage::Cable::Channel + before_subscribe :verify_before_subscribe, if: :before_subscribe? + after_subscribe :verify_after_subscribe, if: :after_subscribe? + + def subscribed + end + + private + + def before_subscribe? + true + end + + def verify_before_subscribe + verifier.before_subscribe + end + + def after_subscribe? + true + end + + def verify_after_subscribe + verifier.after_subscribe + end + end + + class TestChannel9 < Rage::Cable::Channel + before_subscribe :verify_before_subscribe, if: :before_subscribe? + after_subscribe :verify_after_subscribe, if: :after_subscribe? + + def subscribed + end + + private + + def before_subscribe? + false + end + + def verify_before_subscribe + verifier.before_subscribe + end + + def after_subscribe? + false + end + + def verify_after_subscribe + verifier.after_subscribe + end + end + + class TestChannel10 < Rage::Cable::Channel + end + + class TestChannel11 < Rage::Cable::Channel + before_subscribe do + verifier.before_subscribe + end + end + + class TestChannel12 < Rage::Cable::Channel + before_subscribe do + verifier.before_subscribe + end + + after_subscribe do + verifier.after_subscribe + end + end + + class TestChannel13 < Rage::Cable::Channel + before_subscribe do + verifier.before_subscribe_1 + end + + before_subscribe do + verifier.before_subscribe_2 + end + + def subscribed + verifier.subscribed + end + end + + class TestChannel14 < TestChannel13 + before_subscribe do + verifier.before_subscribe_3 + end + + after_subscribe do + verifier.after_subscribe + end + end +end + +RSpec.describe Rage::Cable::Channel do + subject { klass.tap(&:__register_actions).new(nil, nil, nil).__run_action(:subscribed) } + + let(:verifier) { double } + + before do + allow_any_instance_of(Rage::Cable::Channel).to receive(:verifier).and_return(verifier) + end + + context "with the subscribed callback" do + let(:klass) { CableChannelSubscribeSpec::TestChannel } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:subscribed).once + subject + end + end + + context "with before_subscribe" do + let(:klass) { CableChannelSubscribeSpec::TestChannel2 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe).once + expect(verifier).to receive(:subscribed).once + subject + end + end + + context "with before_subscribe and after_subscribe" do + let(:klass) { CableChannelSubscribeSpec::TestChannel3 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe).once + expect(verifier).to receive(:subscribed).once + expect(verifier).to receive(:after_subscribe).once + subject + end + end + + context "with subscription rejection" do + let(:klass) { CableChannelSubscribeSpec::TestChannel4 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe).once + expect(verifier).to receive(:after_subscribe).once + subject + end + end + + context "with rejection in before_subscribe" do + let(:klass) { CableChannelSubscribeSpec::TestChannel5 } + + it "correctly runs the subscribed callback" do + expect(verifier).not_to receive(:subscribed) + expect(verifier).not_to receive(:after_subscribe) + subject + end + end + + context "with rejection and after_subscribe" do + let(:klass) { CableChannelSubscribeSpec::TestChannel6 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:after_subscribe).once + subject + end + end + + context "with rejection and after_subscribe with the subscription_rejected? guard" do + let(:klass) { CableChannelSubscribeSpec::TestChannel7 } + + it "correctly runs the subscribed callback" do + expect(verifier).not_to receive(:after_subscribe) + subject + end + end + + context "with before/after subscribe and true conditionals" do + let(:klass) { CableChannelSubscribeSpec::TestChannel8 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe) + expect(verifier).to receive(:after_subscribe) + subject + end + end + + context "with before/after subscribe and false conditionals" do + let(:klass) { CableChannelSubscribeSpec::TestChannel9 } + + it "correctly runs the subscribed callback" do + expect(verifier).not_to receive(:before_subscribe) + expect(verifier).not_to receive(:after_subscribe) + subject + end + end + + context "with the default callback" do + let(:klass) { CableChannelSubscribeSpec::TestChannel10 } + + it "correctly runs the subscribed callback" do + expect { subject }.not_to raise_error + end + end + + context "with the default callback and before_subscribe" do + let(:klass) { CableChannelSubscribeSpec::TestChannel11 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe) + subject + end + end + + context "with the default callback and before/after subscribe" do + let(:klass) { CableChannelSubscribeSpec::TestChannel12 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe) + expect(verifier).to receive(:after_subscribe) + subject + end + end + + context "with multiple before_subscribe callbacks" do + let(:klass) { CableChannelSubscribeSpec::TestChannel13 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe_1) + expect(verifier).to receive(:before_subscribe_2) + expect(verifier).to receive(:subscribed) + subject + end + end + + context "with inheritance" do + let(:klass) { CableChannelSubscribeSpec::TestChannel14 } + + it "correctly runs the subscribed callback" do + expect(verifier).to receive(:before_subscribe_1) + expect(verifier).to receive(:before_subscribe_2) + expect(verifier).to receive(:before_subscribe_3) + expect(verifier).to receive(:subscribed) + expect(verifier).to receive(:after_subscribe) + subject + end + end +end diff --git a/spec/cable/channel/unsubscribe_spec.rb b/spec/cable/channel/unsubscribe_spec.rb new file mode 100644 index 00000000..73e80a6c --- /dev/null +++ b/spec/cable/channel/unsubscribe_spec.rb @@ -0,0 +1,115 @@ +# frozen_string_literal: true + +module CableChannelUnsubscribeSpec + class TestChannel < Rage::Cable::Channel + def unsubscribed + verifier.unsubscribed + end + end + + class TestChannel2 < Rage::Cable::Channel + before_unsubscribe do + verifier.before_unsubscribe + end + + after_unsubscribe do + verifier.after_unsubscribe + end + + def unsubscribed + verifier.unsubscribed + end + end + + class TestChannel3 < Rage::Cable::Channel + before_unsubscribe do + verifier.before_unsubscribe + end + + after_unsubscribe do + verifier.after_unsubscribe + end + end + + class TestChannel4 < TestChannel3 + def unsubscribed + verifier.unsubscribed + end + end + + class TestChannel5 < Rage::Cable::Channel + before_unsubscribe :verify_before_unsubscribe, if: -> { false } + before_unsubscribe :verify_after_unsubscribe, if: -> { true } + + private + + def verify_before_unsubscribe + verifier.before_unsubscribe + end + + def verify_after_unsubscribe + verifier.after_unsubscribe + end + end +end + +RSpec.describe Rage::Cable::Channel do + subject { klass.tap(&:__register_actions).new(nil, nil, nil).__run_action(:unsubscribed) } + + let(:verifier) { double } + + before do + allow_any_instance_of(Rage::Cable::Channel).to receive(:verifier).and_return(verifier) + end + + context "with the unsubscribed callback" do + let(:klass) { CableChannelUnsubscribeSpec::TestChannel } + + it "correctly runs the unsubscribed callback" do + expect(verifier).to receive(:unsubscribed) + subject + end + end + + context "with before/after unsubscribe" do + let(:klass) { CableChannelUnsubscribeSpec::TestChannel2 } + + it "correctly runs the unsubscribed callback" do + expect(verifier).to receive(:before_unsubscribe) + expect(verifier).to receive(:unsubscribed) + expect(verifier).to receive(:after_unsubscribe) + subject + end + end + + context "with implicit unsubscribed callback" do + let(:klass) { CableChannelUnsubscribeSpec::TestChannel3 } + + it "correctly runs the unsubscribed callback" do + expect(verifier).to receive(:before_unsubscribe) + expect(verifier).to receive(:after_unsubscribe) + subject + end + end + + context "with inheritance" do + let(:klass) { CableChannelUnsubscribeSpec::TestChannel4 } + + it "correctly runs the unsubscribed callback" do + expect(verifier).to receive(:before_unsubscribe) + expect(verifier).to receive(:unsubscribed) + expect(verifier).to receive(:after_unsubscribe) + subject + end + end + + context "with conditionals" do + let(:klass) { CableChannelUnsubscribeSpec::TestChannel5 } + + it "correctly runs the unsubscribed callback" do + expect(verifier).not_to receive(:before_unsubscribe) + expect(verifier).to receive(:after_unsubscribe) + subject + end + end +end diff --git a/spec/cable/connection_spec.rb b/spec/cable/connection_spec.rb new file mode 100644 index 00000000..80f8e429 --- /dev/null +++ b/spec/cable/connection_spec.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +require "domain_name" + +RSpec.describe Rage::Cable::Connection do + describe ".identified_by" do + subject { described_class.new(nil) } + + it "defines accessor methods" do + described_class.identified_by(:test_user) + + expect(subject.test_user).to be_nil + subject.test_user = :user + expect(subject.test_user).to eq(:user) + end + + it "defines channel methods" do + expect(Rage::Cable::Channel).to receive(:__prepare_id_method).with(:test_user).once + described_class.identified_by(:test_user) + end + + context "with identified_by data" do + subject { described_class.new(nil, { test_user: :user_2 }) } + + it "allows to access the data" do + described_class.identified_by(:test_user) + expect(subject.test_user).to eq(:user_2) + end + end + end + + describe "#request" do + subject { described_class.new({ "HTTP_SEC_WEBSOCKET_PROTOCOL" => "test-protocol" }) } + + it "correctly initializes the request object" do + expect(subject.request).to be_a(Rage::Request) + expect(subject.request.headers["Sec-Websocket-Protocol"]).to eq("test-protocol") + end + end + + describe "#cookies" do + subject { described_class.new({ "HTTP_COOKIE" => "user_id=test-user-id" }) } + + it "correctly initializes the cookies object" do + expect(subject.cookies).to be_a(Rage::Cookies) + expect(subject.cookies[:user_id]).to eq("test-user-id") + end + + it "doesn't allow to update cookies" do + expect { subject.cookies[:user_id] = 111 }.to raise_error(/cannot be set/) + end + end + + describe "#params" do + subject { described_class.new({ "QUERY_STRING" => "user_id=test-user-id" }) } + + it "correctly parses parameters" do + expect(subject.params).to be_a(Hash) + expect(subject.params[:user_id]).to eq("test-user-id") + end + end +end diff --git a/spec/cable/router_spec.rb b/spec/cable/router_spec.rb new file mode 100644 index 00000000..6a926015 --- /dev/null +++ b/spec/cable/router_spec.rb @@ -0,0 +1,196 @@ +# frozen_string_literal: true + +RSpec.describe Rage::Cable::Router do + let(:connection) { instance_double("Iodine::Connection", env: {}) } + + before do + allow(Rage).to receive_message_chain(:logger, :debug) do |&block| + puts(block.call) + end + end + + describe "#process_connection" do + subject { described_class.new.process_connection(connection) } + + let(:cable_connection_class) { double } + let(:cable_connection_instance) { double(connect: nil, __identified_by_map: :test_identified_by) } + + before do + stub_const("RageCable::Connection", cable_connection_class) + allow(cable_connection_class).to receive(:method_defined?).with(:disconnect).and_return(true) + allow(cable_connection_class).to receive(:new).and_return(cable_connection_instance) + end + + context "when connection is accepted" do + before do + allow(cable_connection_instance).to receive(:rejected?).and_return(false) + end + + it "accepts a connection" do + expect(subject).to be(true) + end + + it "populates the env hash" do + subject + expect(connection.env["rage.identified_by"]).to eq(:test_identified_by) + expect(connection.env["rage.cable"]).to eq({}) + end + end + + context "when connection is rejected" do + before do + allow(cable_connection_instance).to receive(:rejected?).and_return(true) + end + + it "rejects a connection" do + expect(subject).to be(false) + end + + it "doesn't populate the env hash" do + subject + expect(connection.env).to be_empty + end + end + end + + describe "#process_subscription" do + subject { described_class.new.process_subscription(connection, :test_identifier, channel_name, :test_params) } + + let(:channel_name) { "TestChannel" } + let(:cable_channel_class) { double } + let(:channel) { instance_double("Rage::Cable::Channel") } + + context "with correct channel name" do + before do + stub_const(channel_name, cable_channel_class) + allow(cable_channel_class).to receive(:ancestors).and_return([Rage::Cable::Channel]) + + connection.env["rage.cable"] = {} + connection.env["rage.identified_by"] = :test_identified_by + end + + it "accepts the subscription" do + expect(cable_channel_class).to receive(:__register_actions).once + expect(cable_channel_class).to receive(:new).with(connection, :test_params, :test_identified_by).and_return(channel) + + expect(channel).to receive(:__run_action).with(:subscribed) + expect(channel).to receive(:subscription_rejected?).and_return(false) + + expect(subject).to eq(:subscribed) + expect(connection.env["rage.cable"][:test_identifier]).to eq(channel) + end + + context "with rejection" do + it "rejects the subscription" do + expect(cable_channel_class).to receive(:__register_actions).once + expect(cable_channel_class).to receive(:new).with(connection, :test_params, :test_identified_by).and_return(channel) + + expect(channel).to receive(:__run_action).with(:subscribed) + expect(channel).to receive(:subscription_rejected?).and_return(true) + + expect(subject).to eq(:rejected) + expect(connection.env["rage.cable"]).to be_empty + end + end + end + + context "with incorrect channel name" do + let(:channel_name) { "Array" } + + it "rejects the subscription" do + expect(subject).to eq(:invalid) + expect(connection.env).to be_empty + end + end + + context "with incorrect constant name" do + let(:channel_name) { ";;;;;;;" } + + it "rejects the subscription" do + expect(subject).to eq(:invalid) + expect(connection.env).to be_empty + end + end + end + + describe "#process_message" do + subject { described_class.new.process_message(connection, :test_identifier, :test_action, :test_data) } + + context "with existing subscription" do + let(:channel) { double } + + before do + connection.env["rage.cable"] = { test_identifier: channel } + end + + context "with existing action" do + it "processes the message" do + expect(channel).to receive(:__has_action?).with(:test_action).and_return(true) + expect(channel).to receive(:__run_action).with(:test_action, :test_data) + expect(subject).to eq(:processed) + end + end + + context "with unknown action" do + it "discards the message" do + expect(channel).to receive(:__has_action?).with(:test_action).and_return(false) + expect(channel).not_to receive(:__run_action) + expect(subject).to eq(:unknown_action) + end + end + end + + context "with no subscription" do + before do + connection.env["rage.cable"] = {} + end + + it "discards the message" do + expect(subject).to eq(:no_subscription) + end + end + end + + describe "#process_disconnection" do + subject { described_class.new.process_disconnection(connection) } + + context "with existing subscription" do + let(:channel) { double } + + before do + connection.env["rage.cable"] = { test_identifier: channel } + end + + it "runs the unsubscribed callback" do + expect(channel).to receive(:__run_action).with(:unsubscribed) + subject + end + end + + context "with no subscription" do + before do + connection.env["rage.cable"] = nil + end + + it "runs successfully" do + expect { subject }.not_to raise_error + end + end + + context "with connection" do + let(:cable_connection_class) { double } + let(:cable_connection_instance) { double(connect: nil, __identified_by_map: :test_identified_by) } + + before do + stub_const("RageCable::Connection", cable_connection_class) + allow(cable_connection_class).to receive(:method_defined?).with(:disconnect).and_return(true) + allow(cable_connection_class).to receive(:new).and_return(cable_connection_instance) + end + + it "calls disconnect on the connection class" do + expect(cable_connection_instance).to receive(:disconnect).once + subject + end + end + end +end diff --git a/spec/integration/integration_spec.rb b/spec/integration/integration_spec.rb index 36b0c25c..a6d9c86f 100644 --- a/spec/integration/integration_spec.rb +++ b/spec/integration/integration_spec.rb @@ -2,6 +2,7 @@ require "http" require "benchmark" +require "websocket-client-simple" RSpec.describe "End-to-end" do before :all do @@ -251,4 +252,98 @@ expect(logs.last).to match(/^\[\w{16}\] timestamp=\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2} pid=\d+ level=info method=GET path=\/logs\/custom controller=LogsController action=custom hello=world status=204 duration=\d+\.\d+$/) end end + + context "with websockets" do + let(:subscribe_message) { { identifier: { client: "1", channel: "TimeChannel" }.to_json, command: "subscribe" }.to_json } + let(:get_time_message) { { identifier: { client: "1", channel: "TimeChannel" }.to_json, command: "message", data: { action: "what_time_is_it" }.to_json }.to_json } + let(:sync_time_message) { { identifier: { client: "1", channel: "TimeChannel" }.to_json, command: "message", data: { action: "sync_time" }.to_json }.to_json } + let(:remote_sync_time_message) { { identifier: { client: "1", channel: "TimeChannel" }.to_json, command: "message", data: { action: "remote_sync_time", remote: "time.com" }.to_json }.to_json } + + it "rejects a connection from unknown origin" do + with_websocket_connection("ws://localhost:3000/cable") do |client| + expect(client).not_to be_connected + end + end + + it "rejects a connection with no user_id" do + with_websocket_connection("ws://localhost:3000/cable", headers: { Origin: "localhost:3000" }) do |client| + expect(client.messages[0]).to include("unauthorized") + end + end + + it "opens a connection" do + with_websocket_connection("ws://localhost:3000/cable?user_id=1", headers: { Origin: "localhost:3000" }) do |client| + expect(client.messages.count).to eq(1) + expect(client.messages[0]).to include("welcome") + end + end + + it "subscribes to a channel" do + with_websocket_connection("ws://localhost:3000/cable?user_id=1", headers: { Origin: "localhost:3000" }) do |client| + client.send(subscribe_message) + expect(client.messages.count).to eq(3) + expect(client.messages[1]).to include("confirm_subscription") + expect(client.messages[2]).to include("sending_current_time") + end + end + + it "receives messages from the server" do + with_websocket_connection("ws://localhost:3000/cable?user_id=1", headers: { Origin: "localhost:3000" }) do |client| + client.send(subscribe_message) + client.send(get_time_message) + expect(client.messages.last).to include("transmitting_current_time") + end + end + + it "receives broadcasts from the server" do + with_websocket_connection("ws://localhost:3000/cable?user_id=1", headers: { Origin: "localhost:3000" }) do |client| + client.send(subscribe_message) + client.send(sync_time_message) + expect(client.messages.last).to include("broadcasting_current_time") + end + end + + it "receives broadcasts from the server" do + Thread.new do + with_websocket_connection("ws://localhost:3000/cable?user_id=1", headers: { Origin: "localhost:3000" }) do |client| + client.send(subscribe_message) + client.send(sync_time_message) + end + end + + with_websocket_connection("ws://localhost:3000/cable?user_id=2", headers: { Origin: "localhost:3000" }) do |client| + client.send(subscribe_message) + sleep 0.1 + expect(client.messages.last).to include("broadcasting_current_time") + expect(client.messages.last).to include("initiated by user 1") + end + end + + it "requires a subscription to interact with a channel" do + with_websocket_connection("ws://localhost:3000/cable?user_id=2", headers: { Origin: "localhost:3000" }) do |client| + client.send(get_time_message) + expect(client.messages.last).to include("invalid_request") + end + end + + it "processes messages asynchronously" do + skip if ENV["GITHUB_ACTIONS"] + + threads = 3.times.map do + Thread.new do + with_websocket_connection("ws://localhost:3000/cable?user_id=#{rand(100)}", headers: { Origin: "localhost:3000" }) do |client| + client.send(subscribe_message) + client.send(remote_sync_time_message) + end + end + end + + sleep 1.5 + + threads.each do |thread| + client = thread.value + expect(client.messages.last).to include("synced from time.com") + end + end + end end diff --git a/spec/integration/test_app/app/channels/rage_cable/channel.rb b/spec/integration/test_app/app/channels/rage_cable/channel.rb new file mode 100644 index 00000000..15d9eaef --- /dev/null +++ b/spec/integration/test_app/app/channels/rage_cable/channel.rb @@ -0,0 +1,4 @@ +module RageCable + class Channel < Rage::Cable::Channel + end +end diff --git a/spec/integration/test_app/app/channels/rage_cable/connection.rb b/spec/integration/test_app/app/channels/rage_cable/connection.rb new file mode 100644 index 00000000..6e354e85 --- /dev/null +++ b/spec/integration/test_app/app/channels/rage_cable/connection.rb @@ -0,0 +1,15 @@ +module RageCable + class Connection < Rage::Cable::Connection + identified_by :current_user + + def connect + user_id = params[:user_id] + + if user_id + self.current_user = user_id + else + reject_unauthorized_connection + end + end + end +end diff --git a/spec/integration/test_app/app/channels/time_channel.rb b/spec/integration/test_app/app/channels/time_channel.rb new file mode 100644 index 00000000..b98956f5 --- /dev/null +++ b/spec/integration/test_app/app/channels/time_channel.rb @@ -0,0 +1,19 @@ +class TimeChannel < RageCable::Channel + def subscribed + stream_from "current_time" + transmit({ sending_current_time: Time.now.to_i }) + end + + def what_time_is_it + transmit({ transmitting_current_time: Time.now.to_i }) + end + + def sync_time + broadcast("current_time", { broadcasting_current_time: Time.now.to_i, message: "initiated by user #{current_user}" }) + end + + def remote_sync_time(data) + sleep 1 + transmit({ message: "synced from #{data["remote"]}" }) + end +end diff --git a/spec/integration/test_app/config.ru b/spec/integration/test_app/config.ru index 049a1ad5..a307ef47 100644 --- a/spec/integration/test_app/config.ru +++ b/spec/integration/test_app/config.ru @@ -1,3 +1,7 @@ require_relative "config/application" run Rage.application + +map "/cable" do + run Rage.cable.application +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c5f80d04..a84aff3d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,6 +4,7 @@ require_relative "support/request_helper" require_relative "support/controller_helper" require_relative "support/reactor_helper" +require_relative "support/websocket_helper" RSpec.configure do |config| # Enable flags like --only-failures and --next-failure @@ -23,4 +24,5 @@ config.include RequestHelper config.include ControllerHelper config.include ReactorHelper + config.include WebSocketHelper end diff --git a/spec/support/websocket_helper.rb b/spec/support/websocket_helper.rb new file mode 100644 index 00000000..9c58dd67 --- /dev/null +++ b/spec/support/websocket_helper.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +require "websocket-client-simple" + +module WebSocketHelper + def with_websocket_connection(url, headers: {}) + client = WebSocketTestClient.new(url, headers:) + yield client + client + end + + class WebSocketTestClient + def initialize(url, headers: {}) + @url = url + @headers = headers + + ws_data = { connected: false, closed: false, heartbeats: [], messages: [] } + + @ws = WebSocket::Client::Simple.connect(@url, headers: @headers) do |ws| + ws.on :open do + ws_data[:connected] = true + ws_data[:closed] = false + end + + ws.on :message do |msg| + list = msg.to_s.include?("ping") ? ws_data[:heartbeats] : ws_data[:messages] + list << msg.to_s + end + + ws.on :close do + ws_data[:closed] = true + end + end + + @ws_data = ws_data + + sleep 0.1 + end + + def connected? + @ws.handshake.valid? && @ws_data[:connected] && !@ws_data[:closed] + end + + def send(data) + 2.times do + @ws.send(data) + break + rescue Errno::EBADF + puts "Lost websocket connection! Reconnecting..." + @ws.connect(@url, headers: @headers) + end + + sleep 0.1 + end + + def heartbeats + @ws_data[:heartbeats] + end + + def messages + @ws_data[:messages] + end + end +end