%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
%% @type close_reason(Type) = {shutdown, amqp_reason(Type)}.
%% @type amqp_reason(Type) = {Type, Code, Text}
%% Code = non_neg_integer()
%% Text = binary().
%% @doc This module encapsulates the client's view of an AMQP
%% channel. Each server side channel is represented by an amqp_channel
%% process on the client side. Channel processes are created using the
%% {@link amqp_connection} module. Channel processes are supervised
%% under amqp_client's supervision tree.
%%
%% In case of a failure or an AMQP error, the channel process exits with a
%% meaningful exit reason:
%%
%%
%%
%% Cause |
%% Exit reason |
%%
%%
%% Any reason, where Code would have been 200 otherwise |
%% ```normal''' |
%%
%%
%% User application calls amqp_channel:close/3 |
%% ```close_reason(app_initiated_close)''' |
%%
%%
%% Server closes channel (soft error) |
%% ```close_reason(server_initiated_close)''' |
%%
%%
%% Server misbehaved (did not follow protocol) |
%% ```close_reason(server_misbehaved)''' |
%%
%%
%% Connection is closing (causing all channels to cleanup and
%% close) |
%% ```{shutdown, {connection_closing, amqp_reason(atom())}}''' |
%%
%%
%% Other error |
%% (various error reasons, causing more detailed logging) |
%%
%%
%%
%% See type definitions below.
-module(amqp_channel).
-include("amqp_client_internal.hrl").
-behaviour(gen_server).
-export([call/2, call/3, cast/2, cast/3, cast_flow/3]).
-export([close/1, close/3]).
-export([register_return_handler/2, unregister_return_handler/1,
register_flow_handler/2, unregister_flow_handler/1,
register_confirm_handler/2, unregister_confirm_handler/1]).
-export([call_consumer/2, subscribe/3]).
-export([next_publish_seqno/1, wait_for_confirms/1, wait_for_confirms/2,
wait_for_confirms_or_die/1, wait_for_confirms_or_die/2]).
-export([start_link/5, set_writer/2, connection_closing/3, open/1,
enable_delivery_flow_control/1, notify_received/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-define(TIMEOUT_FLUSH, 60000).
-record(state, {number,
connection,
consumer,
driver,
rpc_requests = queue:new(),
closing = false, %% false |
%% {just_channel, Reason} |
%% {connection, Reason}
writer,
return_handler = none,
confirm_handler = none,
next_pub_seqno = 0,
flow_active = true,
flow_handler = none,
unconfirmed_set = gb_sets:new(),
waiting_set = gb_trees:empty(),
only_acks_received = true,
%% true | false, only relevant in the direct
%% client case.
%% when true, consumers will manually notify
%% queue pids using rabbit_amqqueue_common:notify_sent/2
%% to prevent the queue from overwhelming slow
%% consumers that use automatic acknowledgement
%% mode.
delivery_flow_control = false
}).
%%---------------------------------------------------------------------------
%% Type Definitions
%%---------------------------------------------------------------------------
%% @type amqp_method().
%% This abstract datatype represents the set of methods that comprise
%% the AMQP execution model. As indicated in the overview, the
%% attributes of each method in the execution model are described in
%% the protocol documentation. The Erlang record definitions are
%% autogenerated from a parseable version of the specification. Most
%% fields in the generated records have sensible default values that
%% you need not worry in the case of a simple usage of the client
%% library.
%% @type amqp_msg() = #amqp_msg{}.
%% This is the content encapsulated in content-bearing AMQP methods. It
%% contains the following fields:
%%
%% - props :: class_property() - A class property record, defaults to
%% #'P_basic'{}
%% - payload :: binary() - The arbitrary data payload
%%
%%---------------------------------------------------------------------------
%% AMQP Channel API methods
%%---------------------------------------------------------------------------
%% @spec (Channel, Method) -> Result
%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
call(Channel, Method) ->
gen_server:call(Channel, {call, Method, none, self()}, amqp_util:call_timeout()).
%% @spec (Channel, Method, Content) -> Result
%% where
%% Channel = pid()
%% Method = amqp_method()
%% Content = amqp_msg() | none
%% Result = amqp_method() | ok | blocked | closing
%% @doc This sends an AMQP method on the channel.
%% For content bearing methods, Content has to be an amqp_msg(), whereas
%% for non-content bearing methods, it needs to be the atom 'none'.
%% In the case of synchronous methods, this function blocks until the
%% corresponding reply comes back from the server and returns it.
%% In the case of asynchronous methods, the function blocks until the method
%% gets sent on the wire and returns the atom 'ok' on success.
%% This will return the atom 'blocked' if the server has
%% throttled the client for flow control reasons. This will return the
%% atom 'closing' if the channel is in the process of shutting down.
%% Note that for asynchronous methods, the synchronicity implied by
%% 'call' only means that the client has transmitted the method to
%% the broker. It does not necessarily imply that the broker has
%% accepted responsibility for the message.
call(Channel, Method, Content) ->
gen_server:call(Channel, {call, Method, Content, self()}, amqp_util:call_timeout()).
%% @spec (Channel, Method) -> ok
%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
cast(Channel, Method) ->
gen_server:cast(Channel, {cast, Method, none, self(), noflow}).
%% @spec (Channel, Method, Content) -> ok
%% where
%% Channel = pid()
%% Method = amqp_method()
%% Content = amqp_msg() | none
%% @doc This function is the same as {@link call/3}, except that it returns
%% immediately with the atom 'ok', without blocking the caller process.
%% This function is not recommended with synchronous methods, since there is no
%% way to verify that the server has received the method.
cast(Channel, Method, Content) ->
gen_server:cast(Channel, {cast, Method, Content, self(), noflow}).
%% @spec (Channel, Method, Content) -> ok
%% where
%% Channel = pid()
%% Method = amqp_method()
%% Content = amqp_msg() | none
%% @doc Like cast/3, with flow control.
cast_flow(Channel, Method, Content) ->
credit_flow:send(Channel),
gen_server:cast(Channel, {cast, Method, Content, self(), flow}).
%% @spec (Channel) -> ok | closing
%% where
%% Channel = pid()
%% @doc Closes the channel, invokes
%% close(Channel, 200, <<"Goodbye">>).
close(Channel) ->
close(Channel, 200, <<"Goodbye">>).
%% @spec (Channel, Code, Text) -> ok | closing
%% where
%% Channel = pid()
%% Code = integer()
%% Text = binary()
%% @doc Closes the channel, allowing the caller to supply a reply code and
%% text. If the channel is already closing, the atom 'closing' is returned.
close(Channel, Code, Text) ->
gen_server:call(Channel, {close, Code, Text}, amqp_util:call_timeout()).
%% @spec (Channel) -> integer()
%% where
%% Channel = pid()
%% @doc When in confirm mode, returns the sequence number of the next
%% message to be published.
next_publish_seqno(Channel) ->
gen_server:call(Channel, next_publish_seqno, amqp_util:call_timeout()).
%% @spec (Channel) -> boolean() | 'timeout'
%% where
%% Channel = pid()
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker. Note, when called on a
%% non-Confirm channel, waitForConfirms returns an error.
%% @param Channel: the channel on which to wait.
%% @end
wait_for_confirms(Channel) ->
wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
%% @spec (Channel, Timeout) -> boolean() | 'timeout'
%% where
%% Channel = pid()
%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker or the timeout expires.
%% Note, when called on a non-Confirm channel, waitForConfirms throws
%% an exception.
%% @param Channel: the channel on which to wait.
%% @param Timeout: the wait timeout in seconds.
%% @end
wait_for_confirms(Channel, {Timeout, second}) ->
do_wait_for_confirms(Channel, second_to_millisecond(Timeout));
wait_for_confirms(Channel, {Timeout, millisecond}) ->
do_wait_for_confirms(Channel, Timeout);
wait_for_confirms(Channel, Timeout) ->
do_wait_for_confirms(Channel, second_to_millisecond(Timeout)).
%% @spec (Channel) -> true
%% where
%% Channel = pid()
%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
%% received, the calling process is immediately sent an
%% exit(nack_received).
%% @param Channel: the channel on which to wait.
%% @end
wait_for_confirms_or_die(Channel) ->
wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
%% @spec (Channel, Timeout) -> true
%% where
%% Channel = pid()
%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
%% received, the calling process is immediately sent an
%% exit(nack_received). If the timeout expires, the calling process is
%% sent an exit(timeout).
%% @param Channel: the channel on which to wait.
%% @param Timeout: the wait timeout in seconds.
%% @end
wait_for_confirms_or_die(Channel, Timeout) ->
case wait_for_confirms(Channel, Timeout) of
timeout -> close(Channel, 200, <<"Confirm Timeout">>),
exit(timeout);
false -> close(Channel, 200, <<"Nacks Received">>),
exit(nacks_received);
true -> true
end.
%% @spec (Channel, ReturnHandler) -> ok
%% where
%% Channel = pid()
%% ReturnHandler = pid()
%% @doc This registers a handler to deal with returned messages. The
%% registered process will receive #basic.return{} records.
register_return_handler(Channel, ReturnHandler) ->
gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
%% @spec (Channel) -> ok
%% where
%% Channel = pid()
%% @doc Removes the return handler, if it exists. Does nothing if there is no
%% such handler.
unregister_return_handler(Channel) ->
gen_server:cast(Channel, unregister_return_handler).
%% @spec (Channel, ConfirmHandler) -> ok
%% where
%% Channel = pid()
%% ConfirmHandler = pid()
%% @doc This registers a handler to deal with confirm-related
%% messages. The registered process will receive #basic.ack{} and
%% #basic.nack{} commands.
register_confirm_handler(Channel, ConfirmHandler) ->
gen_server:cast(Channel, {register_confirm_handler, ConfirmHandler} ).
%% @spec (Channel) -> ok
%% where
%% Channel = pid()
%% @doc Removes the confirm handler, if it exists. Does nothing if there is no
%% such handler.
unregister_confirm_handler(Channel) ->
gen_server:cast(Channel, unregister_confirm_handler).
%% @spec (Channel, FlowHandler) -> ok
%% where
%% Channel = pid()
%% FlowHandler = pid()
%% @doc This registers a handler to deal with channel flow notifications.
%% The registered process will receive #channel.flow{} records.
register_flow_handler(Channel, FlowHandler) ->
gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
%% @spec (Channel) -> ok
%% where
%% Channel = pid()
%% @doc Removes the flow handler, if it exists. Does nothing if there is no
%% such handler.
unregister_flow_handler(Channel) ->
gen_server:cast(Channel, unregister_flow_handler).
%% @spec (Channel, Msg) -> ok
%% where
%% Channel = pid()
%% Msg = any()
%% @doc This causes the channel to invoke Consumer:handle_call/2,
%% where Consumer is the amqp_gen_consumer implementation registered with
%% the channel.
call_consumer(Channel, Msg) ->
gen_server:call(Channel, {call_consumer, Msg}, amqp_util:call_timeout()).
%% @spec (Channel, BasicConsume, Subscriber) -> ok
%% where
%% Channel = pid()
%% BasicConsume = amqp_method()
%% Subscriber = pid()
%% @doc Subscribe the given pid to a queue using the specified
%% basic.consume method.
subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, amqp_util:call_timeout()).
%%---------------------------------------------------------------------------
%% Internal interface
%%---------------------------------------------------------------------------
%% @private
start_link(Driver, Connection, ChannelNumber, Consumer, Identity) ->
gen_server:start_link(
?MODULE, [Driver, Connection, ChannelNumber, Consumer, Identity], []).
set_writer(Pid, Writer) ->
gen_server:cast(Pid, {set_writer, Writer}).
enable_delivery_flow_control(Pid) ->
gen_server:cast(Pid, enable_delivery_flow_control).
notify_received({Pid, QPid, ServerChPid}) ->
gen_server:cast(Pid, {send_notify, {QPid, ServerChPid}}).
%% @private
connection_closing(Pid, ChannelCloseType, Reason) ->
gen_server:cast(Pid, {connection_closing, ChannelCloseType, Reason}).
%% @private
open(Pid) ->
gen_server:call(Pid, open, amqp_util:call_timeout()).
%%---------------------------------------------------------------------------
%% gen_server callbacks
%%---------------------------------------------------------------------------
%% @private
init([Driver, Connection, ChannelNumber, Consumer, Identity]) ->
?store_proc_name(Identity),
{ok, #state{connection = Connection,
driver = Driver,
number = ChannelNumber,
consumer = Consumer}}.
%% @private
handle_call(open, From, State) ->
{noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)};
%% @private
handle_call({close, Code, Text}, From, State) ->
handle_close(Code, Text, From, State);
%% @private
handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
handle_method_to_server(Method, AmqpMsg, From, Sender, noflow, State);
%% Handles the delivery of messages from a direct channel
%% @private
handle_call({send_command_sync, Method, Content}, From, State) ->
Ret = handle_method_from_server(Method, Content, State),
gen_server:reply(From, ok),
Ret;
%% Handles the delivery of messages from a direct channel
%% @private
handle_call({send_command_sync, Method}, From, State) ->
Ret = handle_method_from_server(Method, none, State),
gen_server:reply(From, ok),
Ret;
%% @private
handle_call(next_publish_seqno, _From,
State = #state{next_pub_seqno = SeqNo}) ->
{reply, SeqNo, State};
handle_call({wait_for_confirms, Timeout}, From, State) ->
handle_wait_for_confirms(From, Timeout, State);
%% @private
handle_call({call_consumer, Msg}, _From,
State = #state{consumer = Consumer}) ->
{reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
%% @private
handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
handle_method_to_server(BasicConsume, none, From, Subscriber, noflow,
State).
%% @private
handle_cast({set_writer, Writer}, State = #state{driver = direct}) ->
link(Writer),
{noreply, State#state{writer = Writer}};
handle_cast({set_writer, Writer}, State) ->
{noreply, State#state{writer = Writer}};
%% @private
handle_cast(enable_delivery_flow_control, State) ->
{noreply, State#state{delivery_flow_control = true}};
%% @private
handle_cast({send_notify, {QPid, ChPid}}, State) ->
rabbit_amqqueue_common:notify_sent(QPid, ChPid),
{noreply, State};
%% @private
handle_cast({cast, Method, AmqpMsg, Sender, noflow}, State) ->
handle_method_to_server(Method, AmqpMsg, none, Sender, noflow, State);
handle_cast({cast, Method, AmqpMsg, Sender, flow}, State) ->
credit_flow:ack(Sender),
handle_method_to_server(Method, AmqpMsg, none, Sender, flow, State);
%% @private
handle_cast({register_return_handler, ReturnHandler}, State) ->
Ref = erlang:monitor(process, ReturnHandler),
{noreply, State#state{return_handler = {ReturnHandler, Ref}}};
%% @private
handle_cast(unregister_return_handler,
State = #state{return_handler = {_ReturnHandler, Ref}}) ->
erlang:demonitor(Ref),
{noreply, State#state{return_handler = none}};
%% @private
handle_cast({register_confirm_handler, ConfirmHandler}, State) ->
Ref = erlang:monitor(process, ConfirmHandler),
{noreply, State#state{confirm_handler = {ConfirmHandler, Ref}}};
%% @private
handle_cast(unregister_confirm_handler,
State = #state{confirm_handler = {_ConfirmHandler, Ref}}) ->
erlang:demonitor(Ref),
{noreply, State#state{confirm_handler = none}};
%% @private
handle_cast({register_flow_handler, FlowHandler}, State) ->
Ref = erlang:monitor(process, FlowHandler),
{noreply, State#state{flow_handler = {FlowHandler, Ref}}};
%% @private
handle_cast(unregister_flow_handler,
State = #state{flow_handler = {_FlowHandler, Ref}}) ->
erlang:demonitor(Ref),
{noreply, State#state{flow_handler = none}};
%% Received from channels manager
%% @private
handle_cast({method, Method, Content, noflow}, State) ->
handle_method_from_server(Method, Content, State);
%% Handles the situation when the connection closes without closing the channel
%% beforehand. The channel must block all further RPCs,
%% flush the RPC queue (optional), and terminate
%% @private
handle_cast({connection_closing, CloseType, Reason}, State) ->
handle_connection_closing(CloseType, Reason, State);
%% @private
handle_cast({shutdown, Shutdown}, State) ->
handle_shutdown(Shutdown, State).
%% Received from rabbit_channel in the direct case
%% @private
handle_info({send_command, Method}, State) ->
handle_method_from_server(Method, none, State);
%% Received from rabbit_channel in the direct case
%% @private
handle_info({send_command, Method, Content}, State) ->
handle_method_from_server(Method, Content, State);
%% Received from rabbit_channel in the direct case
%% @private
handle_info({send_command_and_notify, QPid, ChPid,
Method = #'basic.deliver'{}, Content},
State = #state{delivery_flow_control = MFC}) ->
case MFC of
false -> handle_method_from_server(Method, Content, State),
rabbit_amqqueue_common:notify_sent(QPid, ChPid);
true -> handle_method_from_server(Method, Content,
{self(), QPid, ChPid}, State)
end,
{noreply, State};
%% This comes from the writer or rabbit_channel
%% @private
handle_info({channel_exit, _ChNumber, Reason}, State) ->
handle_channel_exit(Reason, State);
%% This comes from rabbit_channel in the direct case
handle_info({channel_closing, ChPid}, State) ->
ok = rabbit_channel_common:ready_for_close(ChPid),
{noreply, State};
%% @private
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{noreply, State};
%% @private
handle_info(timed_out_flushing_channel, State) ->
?LOG_WARN("Channel (~p) closing: timed out flushing while "
"connection closing", [self()]),
{stop, timed_out_flushing_channel, State};
%% @private
handle_info({'DOWN', _, process, ReturnHandler, shutdown},
State = #state{return_handler = {ReturnHandler, _Ref}}) ->
{noreply, State#state{return_handler = none}};
handle_info({'DOWN', _, process, ReturnHandler, Reason},
State = #state{return_handler = {ReturnHandler, _Ref}}) ->
?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. "
"Reason: ~p", [self(), ReturnHandler, Reason]),
{noreply, State#state{return_handler = none}};
%% @private
handle_info({'DOWN', _, process, ConfirmHandler, shutdown},
State = #state{confirm_handler = {ConfirmHandler, _Ref}}) ->
{noreply, State#state{confirm_handler = none}};
handle_info({'DOWN', _, process, ConfirmHandler, Reason},
State = #state{confirm_handler = {ConfirmHandler, _Ref}}) ->
?LOG_WARN("Channel (~p): Unregistering confirm handler ~p because it died. "
"Reason: ~p", [self(), ConfirmHandler, Reason]),
{noreply, State#state{confirm_handler = none}};
%% @private
handle_info({'DOWN', _, process, FlowHandler, shutdown},
State = #state{flow_handler = {FlowHandler, _Ref}}) ->
{noreply, State#state{flow_handler = none}};
handle_info({'DOWN', _, process, FlowHandler, Reason},
State = #state{flow_handler = {FlowHandler, _Ref}}) ->
?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
"Reason: ~p", [self(), FlowHandler, Reason]),
{noreply, State#state{flow_handler = none}};
handle_info({'DOWN', _, process, QPid, _Reason}, State) ->
rabbit_amqqueue_common:notify_sent_queue_down(QPid),
{noreply, State};
handle_info({confirm_timeout, From}, State = #state{waiting_set = WSet}) ->
case gb_trees:lookup(From, WSet) of
none ->
{noreply, State};
{value, _} ->
gen_server:reply(From, timeout),
{noreply, State#state{waiting_set = gb_trees:delete(From, WSet)}}
end.
%% @private
terminate(_Reason, State) ->
flush_writer(State),
State.
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%---------------------------------------------------------------------------
%% RPC mechanism
%%---------------------------------------------------------------------------
handle_method_to_server(Method, AmqpMsg, From, Sender, Flow,
State = #state{unconfirmed_set = USet}) ->
case {check_invalid_method(Method), From,
check_block(Method, AmqpMsg, State)} of
{ok, _, ok} ->
State1 = case {Method, State#state.next_pub_seqno} of
{#'confirm.select'{}, 0} ->
%% The confirm seqno is set to 1 on the
%% first confirm.select only.
State#state{next_pub_seqno = 1};
{#'basic.publish'{}, 0} ->
State;
{#'basic.publish'{}, SeqNo} ->
State#state{unconfirmed_set =
gb_sets:add(SeqNo, USet),
next_pub_seqno = SeqNo + 1};
_ ->
State
end,
{noreply, rpc_top_half(Method, build_content(AmqpMsg),
From, Sender, Flow, State1)};
{ok, none, BlockReply} ->
?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
"Reason: ~p", [self(), Method, BlockReply]),
{noreply, State};
{ok, _, BlockReply} ->
{reply, BlockReply, State};
{{_, InvalidMethodMessage}, none, _} ->
?LOG_WARN("Channel (~p): ignoring cast of ~p method. " ++
InvalidMethodMessage ++ "", [self(), Method]),
{noreply, State};
{{InvalidMethodReply, _}, _, _} ->
{reply, {error, InvalidMethodReply}, State}
end.
handle_close(Code, Text, From, State) ->
Close = #'channel.close'{reply_code = Code,
reply_text = Text,
class_id = 0,
method_id = 0},
case check_block(Close, none, State) of
ok -> {noreply, rpc_top_half(Close, none, From, none, noflow,
State)};
BlockReply -> {reply, BlockReply, State}
end.
rpc_top_half(Method, Content, From, Sender, Flow,
State0 = #state{rpc_requests = RequestQueue}) ->
State1 = State0#state{
rpc_requests = queue:in({From, Sender, Method, Content, Flow},
RequestQueue)},
IsFirstElement = queue:is_empty(RequestQueue),
if IsFirstElement -> do_rpc(State1);
true -> State1
end.
rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
{{value, {From, _Sender, _Method, _Content, _Flow}}, RequestQueue1} =
queue:out(RequestQueue),
case From of
none -> ok;
_ -> gen_server:reply(From, Reply)
end,
do_rpc(State#state{rpc_requests = RequestQueue1}).
do_rpc(State = #state{rpc_requests = Q,
closing = Closing}) ->
case queue:out(Q) of
{{value, {From, Sender, Method, Content, Flow}}, NewQ} ->
State1 = pre_do(Method, Content, Sender, State),
DoRet = do(Method, Content, Flow, State1),
case ?PROTOCOL:is_method_synchronous(Method) of
true -> State1;
false -> case {From, DoRet} of
{none, _} -> ok;
{_, ok} -> gen_server:reply(From, ok);
_ -> ok
%% Do not reply if error in do. Expecting
%% {channel_exit, _, _}
end,
do_rpc(State1#state{rpc_requests = NewQ})
end;
{empty, NewQ} ->
case Closing of
{connection, Reason} ->
gen_server:cast(self(),
{shutdown, {connection_closing, Reason}});
_ ->
ok
end,
State#state{rpc_requests = NewQ}
end.
pending_rpc_method(#state{rpc_requests = Q}) ->
{value, {_From, _Sender, Method, _Content, _Flow}} = queue:peek(Q),
Method.
pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
_Sender, State) ->
State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
pre_do(#'basic.consume'{} = Method, none, Sender, State) ->
ok = call_to_consumer(Method, Sender, State),
State;
pre_do(#'basic.cancel'{} = Method, none, Sender, State) ->
ok = call_to_consumer(Method, Sender, State),
State;
pre_do(_, _, _, State) ->
State.
%%---------------------------------------------------------------------------
%% Handling of methods from the server
%%---------------------------------------------------------------------------
safely_handle_method_from_server(Method, Content,
Continuation,
State = #state{closing = Closing}) ->
case is_connection_method(Method) of
true -> server_misbehaved(
#amqp_error{name = command_invalid,
explanation = "connection method on "
"non-zero channel",
method = element(1, Method)},
State);
false -> Drop = case {Closing, Method} of
{{just_channel, _}, #'channel.close'{}} -> false;
{{just_channel, _}, #'channel.close_ok'{}} -> false;
{{just_channel, _}, _} -> true;
_ -> false
end,
if Drop -> ?LOG_INFO("Channel (~p): dropping method ~p from "
"server because channel is closing",
[self(), {Method, Content}]),
{noreply, State};
true ->
Continuation()
end
end.
handle_method_from_server(Method, Content, State) ->
Fun = fun () ->
handle_method_from_server1(Method,
amqp_msg(Content), State)
end,
safely_handle_method_from_server(Method, Content, Fun, State).
handle_method_from_server(Method = #'basic.deliver'{},
Content, DeliveryCtx, State) ->
Fun = fun () ->
handle_method_from_server1(Method,
amqp_msg(Content),
DeliveryCtx,
State)
end,
safely_handle_method_from_server(Method, Content, Fun, State).
handle_method_from_server1(#'channel.open_ok'{}, none, State) ->
{noreply, rpc_bottom_half(ok, State)};
handle_method_from_server1(#'channel.close'{reply_code = Code,
reply_text = Text},
none,
State = #state{closing = {just_channel, _}}) ->
%% Both client and server sent close at the same time. Don't shutdown yet,
%% wait for close_ok.
do(#'channel.close_ok'{}, none, noflow, State),
{noreply,
State#state{
closing = {just_channel, {server_initiated_close, Code, Text}}}};
handle_method_from_server1(#'channel.close'{reply_code = Code,
reply_text = Text}, none, State) ->
do(#'channel.close_ok'{}, none, noflow, State),
handle_shutdown({server_initiated_close, Code, Text}, State);
handle_method_from_server1(#'channel.close_ok'{}, none,
State = #state{closing = Closing}) ->
case Closing of
{just_channel, {app_initiated_close, _, _} = Reason} ->
handle_shutdown(Reason, rpc_bottom_half(ok, State));
{just_channel, {server_initiated_close, _, _} = Reason} ->
handle_shutdown(Reason,
rpc_bottom_half(closing, State));
{connection, Reason} ->
handle_shutdown({connection_closing, Reason}, State)
end;
handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) ->
Consume = #'basic.consume'{} = pending_rpc_method(State),
ok = call_to_consumer(ConsumeOk, Consume, State),
{noreply, rpc_bottom_half(ConsumeOk, State)};
handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) ->
Cancel = #'basic.cancel'{} = pending_rpc_method(State),
ok = call_to_consumer(CancelOk, Cancel, State),
{noreply, rpc_bottom_half(CancelOk, State)};
handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
ok = call_to_consumer(Cancel, none, State),
{noreply, State};
handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
ok = call_to_consumer(Deliver, AmqpMsg, State),
{noreply, State};
handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
State = #state{flow_handler = FlowHandler}) ->
case FlowHandler of none -> ok;
{Pid, _Ref} -> Pid ! Flow
end,
%% Putting the flow_ok in the queue so that the RPC queue can be
%% flushed beforehand. Methods that made it to the queue are not
%% blocked in any circumstance.
{noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
none, noflow, State#state{flow_active = Active})};
handle_method_from_server1(
#'basic.return'{} = BasicReturn, AmqpMsg,
State = #state{return_handler = ReturnHandler}) ->
case ReturnHandler of
none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is "
"no return handler registered",
[self(), BasicReturn, AmqpMsg]);
{Pid, _Ref} -> Pid ! {BasicReturn, AmqpMsg}
end,
{noreply, State};
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
#state{confirm_handler = none} = State) ->
{noreply, update_confirm_set(BasicAck, State)};
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
#state{confirm_handler = {CH, _Ref}} = State) ->
CH ! BasicAck,
{noreply, update_confirm_set(BasicAck, State)};
handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
#state{confirm_handler = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
"confirm handler registered", [self(), BasicNack]),
{noreply, update_confirm_set(BasicNack, State)};
handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
#state{confirm_handler = {CH, _Ref}} = State) ->
CH ! BasicNack,
{noreply, update_confirm_set(BasicNack, State)};
handle_method_from_server1(#'basic.credit_drained'{} = CreditDrained, none,
#state{consumer = Consumer} = State) ->
Consumer ! CreditDrained,
{noreply, State};
handle_method_from_server1(Method, none, State) ->
{noreply, rpc_bottom_half(Method, State)};
handle_method_from_server1(Method, Content, State) ->
{noreply, rpc_bottom_half({Method, Content}, State)}.
%% only used with manual consumer-to-queue flow control
handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg,
DeliveryCtx, State) ->
ok = call_to_consumer(Deliver, AmqpMsg, DeliveryCtx, State),
{noreply, State}.
%%---------------------------------------------------------------------------
%% Other handle_* functions
%%---------------------------------------------------------------------------
handle_connection_closing(CloseType, Reason,
State = #state{rpc_requests = RpcQueue,
closing = Closing}) ->
NewState = State#state{closing = {connection, Reason}},
case {CloseType, Closing, queue:is_empty(RpcQueue)} of
{flush, false, false} ->
erlang:send_after(?TIMEOUT_FLUSH, self(),
timed_out_flushing_channel),
{noreply, NewState};
{flush, {just_channel, _}, false} ->
{noreply, NewState};
_ ->
handle_shutdown({connection_closing, Reason}, NewState)
end.
handle_channel_exit(Reason = #amqp_error{name = ErrorName, explanation = Expl},
State = #state{connection = Connection, number = Number}) ->
%% Sent by rabbit_channel for hard errors in the direct case
?LOG_ERR("connection ~p, channel ~p - error:~n~p",
[Connection, Number, Reason]),
{true, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
ReportedReason = {server_initiated_close, Code, Expl},
amqp_gen_connection:hard_error_in_channel(
Connection, self(), ReportedReason),
handle_shutdown({connection_closing, ReportedReason}, State);
handle_channel_exit(Reason, State) ->
%% Unexpected death of a channel infrastructure process
{stop, {infrastructure_died, Reason}, State}.
handle_shutdown({_, 200, _}, State) ->
{stop, normal, State};
handle_shutdown({connection_closing, {_, 200, _}}, State) ->
{stop, normal, State};
handle_shutdown({connection_closing, normal}, State) ->
{stop, normal, State};
handle_shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
%%---------------------------------------------------------------------------
%% Internal plumbing
%%---------------------------------------------------------------------------
do(Method, Content, Flow, #state{driver = network, writer = W}) ->
%% Catching because it expects the {channel_exit, _, _} message on error
catch case {Content, Flow} of
{none, _} -> rabbit_writer:send_command(W, Method);
{_, flow} -> rabbit_writer:send_command_flow(W, Method,
Content);
{_, noflow} -> rabbit_writer:send_command(W, Method, Content)
end;
do(Method, Content, Flow, #state{driver = direct, writer = W}) ->
%% ditto catching because...
catch case {Content, Flow} of
{none, _} -> rabbit_channel_common:do(W, Method);
{_, flow} -> rabbit_channel_common:do_flow(W, Method, Content);
{_, noflow} -> rabbit_channel_common:do(W, Method, Content)
end.
flush_writer(#state{driver = network, writer = Writer}) ->
try
rabbit_writer:flush(Writer)
catch
exit:noproc -> ok
end;
flush_writer(#state{driver = direct}) ->
ok.
amqp_msg(none) ->
none;
amqp_msg(Content) ->
{Props, Payload} = rabbit_basic_common:from_content(Content),
#amqp_msg{props = Props, payload = Payload}.
build_content(none) ->
none;
build_content(#amqp_msg{props = Props, payload = Payload}) ->
rabbit_basic_common:build_content(Props, Payload).
check_block(_Method, _AmqpMsg, #state{closing = {just_channel, _}}) ->
closing;
check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) ->
closing;
check_block(_Method, none, #state{}) ->
ok;
check_block(_Method, #amqp_msg{}, #state{flow_active = false}) ->
blocked;
check_block(_Method, _AmqpMsg, #state{}) ->
ok.
check_invalid_method(#'channel.open'{}) ->
{use_amqp_connection_module,
"Use amqp_connection:open_channel/{1,2} instead"};
check_invalid_method(#'channel.close'{}) ->
{use_close_function, "Use close/{1,3} instead"};
check_invalid_method(Method) ->
case is_connection_method(Method) of
true -> {connection_methods_not_allowed,
"Sending connection methods is not allowed"};
false -> ok
end.
is_connection_method(Method) ->
{ClassId, _} = ?PROTOCOL:method_id(element(1, Method)),
?PROTOCOL:lookup_class_name(ClassId) == connection.
server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of
{0, _} ->
handle_shutdown({server_misbehaved, AmqpError}, State);
{_, Close} ->
?LOG_WARN("Channel (~p) flushing and closing due to soft "
"error caused by the server ~p", [self(), AmqpError]),
Self = self(),
spawn(fun () -> call(Self, Close) end),
{noreply, State}
end.
update_confirm_set(#'basic.ack'{delivery_tag = SeqNo,
multiple = Multiple},
State = #state{unconfirmed_set = USet}) ->
maybe_notify_waiters(
State#state{unconfirmed_set =
update_unconfirmed(SeqNo, Multiple, USet)});
update_confirm_set(#'basic.nack'{delivery_tag = SeqNo,
multiple = Multiple},
State = #state{unconfirmed_set = USet}) ->
maybe_notify_waiters(
State#state{unconfirmed_set = update_unconfirmed(SeqNo, Multiple, USet),
only_acks_received = false}).
update_unconfirmed(SeqNo, false, USet) ->
gb_sets:del_element(SeqNo, USet);
update_unconfirmed(SeqNo, true, USet) ->
case gb_sets:is_empty(USet) of
true -> USet;
false -> {S, USet1} = gb_sets:take_smallest(USet),
case S > SeqNo of
true -> USet;
false -> update_unconfirmed(SeqNo, true, USet1)
end
end.
maybe_notify_waiters(State = #state{unconfirmed_set = USet}) ->
case gb_sets:is_empty(USet) of
false -> State;
true -> notify_confirm_waiters(State)
end.
notify_confirm_waiters(State = #state{waiting_set = WSet,
only_acks_received = OAR}) ->
[begin
safe_cancel_timer(TRef),
gen_server:reply(From, OAR)
end || {From, TRef} <- gb_trees:to_list(WSet)],
State#state{waiting_set = gb_trees:empty(),
only_acks_received = true}.
do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) ->
case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
{error, Reason} -> throw(Reason);
Other -> Other
end.
handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) ->
{reply, {error, not_in_confirm_mode}, State};
handle_wait_for_confirms(From, Timeout,
State = #state{unconfirmed_set = USet,
waiting_set = WSet}) ->
case gb_sets:is_empty(USet) of
true -> {reply, true, State};
false -> TRef = case Timeout of
infinity -> undefined;
_ -> erlang:send_after(
Timeout, self(),
{confirm_timeout, From})
end,
{noreply,
State#state{waiting_set = gb_trees:insert(From, TRef, WSet)}}
end.
call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
amqp_gen_consumer:call_consumer(Consumer, Method, Args).
call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->
amqp_gen_consumer:call_consumer(Consumer, Method, Args, DeliveryCtx).
safe_cancel_timer(undefined) -> ok;
safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).
second_to_millisecond(Timeout) ->
Timeout * 1000.