%% This Source Code Form is subject to the terms of the Mozilla Public %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% %% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. %% %% @type close_reason(Type) = {shutdown, amqp_reason(Type)}. %% @type amqp_reason(Type) = {Type, Code, Text} %% Code = non_neg_integer() %% Text = binary(). %% @doc This module encapsulates the client's view of an AMQP %% channel. Each server side channel is represented by an amqp_channel %% process on the client side. Channel processes are created using the %% {@link amqp_connection} module. Channel processes are supervised %% under amqp_client's supervision tree.
%%
%% In case of a failure or an AMQP error, the channel process exits with a %% meaningful exit reason:
%%
%% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %% %%
CauseExit reason
Any reason, where Code would have been 200 otherwise```normal'''
User application calls amqp_channel:close/3```close_reason(app_initiated_close)'''
Server closes channel (soft error)```close_reason(server_initiated_close)'''
Server misbehaved (did not follow protocol)```close_reason(server_misbehaved)'''
Connection is closing (causing all channels to cleanup and %% close)```{shutdown, {connection_closing, amqp_reason(atom())}}'''
Other error(various error reasons, causing more detailed logging)
%%
%% See type definitions below. -module(amqp_channel). -include("amqp_client_internal.hrl"). -behaviour(gen_server). -export([call/2, call/3, cast/2, cast/3, cast_flow/3]). -export([close/1, close/3]). -export([register_return_handler/2, unregister_return_handler/1, register_flow_handler/2, unregister_flow_handler/1, register_confirm_handler/2, unregister_confirm_handler/1]). -export([call_consumer/2, subscribe/3]). -export([next_publish_seqno/1, wait_for_confirms/1, wait_for_confirms/2, wait_for_confirms_or_die/1, wait_for_confirms_or_die/2]). -export([start_link/5, set_writer/2, connection_closing/3, open/1, enable_delivery_flow_control/1, notify_received/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -define(TIMEOUT_FLUSH, 60000). -record(state, {number, connection, consumer, driver, rpc_requests = queue:new(), closing = false, %% false | %% {just_channel, Reason} | %% {connection, Reason} writer, return_handler = none, confirm_handler = none, next_pub_seqno = 0, flow_active = true, flow_handler = none, unconfirmed_set = gb_sets:new(), waiting_set = gb_trees:empty(), only_acks_received = true, %% true | false, only relevant in the direct %% client case. %% when true, consumers will manually notify %% queue pids using rabbit_amqqueue_common:notify_sent/2 %% to prevent the queue from overwhelming slow %% consumers that use automatic acknowledgement %% mode. delivery_flow_control = false }). %%--------------------------------------------------------------------------- %% Type Definitions %%--------------------------------------------------------------------------- %% @type amqp_method(). %% This abstract datatype represents the set of methods that comprise %% the AMQP execution model. As indicated in the overview, the %% attributes of each method in the execution model are described in %% the protocol documentation. The Erlang record definitions are %% autogenerated from a parseable version of the specification. Most %% fields in the generated records have sensible default values that %% you need not worry in the case of a simple usage of the client %% library. %% @type amqp_msg() = #amqp_msg{}. %% This is the content encapsulated in content-bearing AMQP methods. It %% contains the following fields: %% %%--------------------------------------------------------------------------- %% AMQP Channel API methods %%--------------------------------------------------------------------------- %% @spec (Channel, Method) -> Result %% @doc This is equivalent to amqp_channel:call(Channel, Method, none). call(Channel, Method) -> gen_server:call(Channel, {call, Method, none, self()}, amqp_util:call_timeout()). %% @spec (Channel, Method, Content) -> Result %% where %% Channel = pid() %% Method = amqp_method() %% Content = amqp_msg() | none %% Result = amqp_method() | ok | blocked | closing %% @doc This sends an AMQP method on the channel. %% For content bearing methods, Content has to be an amqp_msg(), whereas %% for non-content bearing methods, it needs to be the atom 'none'.
%% In the case of synchronous methods, this function blocks until the %% corresponding reply comes back from the server and returns it. %% In the case of asynchronous methods, the function blocks until the method %% gets sent on the wire and returns the atom 'ok' on success.
%% This will return the atom 'blocked' if the server has %% throttled the client for flow control reasons. This will return the %% atom 'closing' if the channel is in the process of shutting down.
%% Note that for asynchronous methods, the synchronicity implied by %% 'call' only means that the client has transmitted the method to %% the broker. It does not necessarily imply that the broker has %% accepted responsibility for the message. call(Channel, Method, Content) -> gen_server:call(Channel, {call, Method, Content, self()}, amqp_util:call_timeout()). %% @spec (Channel, Method) -> ok %% @doc This is equivalent to amqp_channel:cast(Channel, Method, none). cast(Channel, Method) -> gen_server:cast(Channel, {cast, Method, none, self(), noflow}). %% @spec (Channel, Method, Content) -> ok %% where %% Channel = pid() %% Method = amqp_method() %% Content = amqp_msg() | none %% @doc This function is the same as {@link call/3}, except that it returns %% immediately with the atom 'ok', without blocking the caller process. %% This function is not recommended with synchronous methods, since there is no %% way to verify that the server has received the method. cast(Channel, Method, Content) -> gen_server:cast(Channel, {cast, Method, Content, self(), noflow}). %% @spec (Channel, Method, Content) -> ok %% where %% Channel = pid() %% Method = amqp_method() %% Content = amqp_msg() | none %% @doc Like cast/3, with flow control. cast_flow(Channel, Method, Content) -> credit_flow:send(Channel), gen_server:cast(Channel, {cast, Method, Content, self(), flow}). %% @spec (Channel) -> ok | closing %% where %% Channel = pid() %% @doc Closes the channel, invokes %% close(Channel, 200, <<"Goodbye">>). close(Channel) -> close(Channel, 200, <<"Goodbye">>). %% @spec (Channel, Code, Text) -> ok | closing %% where %% Channel = pid() %% Code = integer() %% Text = binary() %% @doc Closes the channel, allowing the caller to supply a reply code and %% text. If the channel is already closing, the atom 'closing' is returned. close(Channel, Code, Text) -> gen_server:call(Channel, {close, Code, Text}, amqp_util:call_timeout()). %% @spec (Channel) -> integer() %% where %% Channel = pid() %% @doc When in confirm mode, returns the sequence number of the next %% message to be published. next_publish_seqno(Channel) -> gen_server:call(Channel, next_publish_seqno, amqp_util:call_timeout()). %% @spec (Channel) -> boolean() | 'timeout' %% where %% Channel = pid() %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker. Note, when called on a %% non-Confirm channel, waitForConfirms returns an error. %% @param Channel: the channel on which to wait. %% @end wait_for_confirms(Channel) -> wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT). %% @spec (Channel, Timeout) -> boolean() | 'timeout' %% where %% Channel = pid() %% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity' %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker or the timeout expires. %% Note, when called on a non-Confirm channel, waitForConfirms throws %% an exception. %% @param Channel: the channel on which to wait. %% @param Timeout: the wait timeout in seconds. %% @end wait_for_confirms(Channel, {Timeout, second}) -> do_wait_for_confirms(Channel, second_to_millisecond(Timeout)); wait_for_confirms(Channel, {Timeout, millisecond}) -> do_wait_for_confirms(Channel, Timeout); wait_for_confirms(Channel, Timeout) -> do_wait_for_confirms(Channel, second_to_millisecond(Timeout)). %% @spec (Channel) -> true %% where %% Channel = pid() %% @doc Behaves the same as wait_for_confirms/1, but if a nack is %% received, the calling process is immediately sent an %% exit(nack_received). %% @param Channel: the channel on which to wait. %% @end wait_for_confirms_or_die(Channel) -> wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT). %% @spec (Channel, Timeout) -> true %% where %% Channel = pid() %% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity' %% @doc Behaves the same as wait_for_confirms/1, but if a nack is %% received, the calling process is immediately sent an %% exit(nack_received). If the timeout expires, the calling process is %% sent an exit(timeout). %% @param Channel: the channel on which to wait. %% @param Timeout: the wait timeout in seconds. %% @end wait_for_confirms_or_die(Channel, Timeout) -> case wait_for_confirms(Channel, Timeout) of timeout -> close(Channel, 200, <<"Confirm Timeout">>), exit(timeout); false -> close(Channel, 200, <<"Nacks Received">>), exit(nacks_received); true -> true end. %% @spec (Channel, ReturnHandler) -> ok %% where %% Channel = pid() %% ReturnHandler = pid() %% @doc This registers a handler to deal with returned messages. The %% registered process will receive #basic.return{} records. register_return_handler(Channel, ReturnHandler) -> gen_server:cast(Channel, {register_return_handler, ReturnHandler} ). %% @spec (Channel) -> ok %% where %% Channel = pid() %% @doc Removes the return handler, if it exists. Does nothing if there is no %% such handler. unregister_return_handler(Channel) -> gen_server:cast(Channel, unregister_return_handler). %% @spec (Channel, ConfirmHandler) -> ok %% where %% Channel = pid() %% ConfirmHandler = pid() %% @doc This registers a handler to deal with confirm-related %% messages. The registered process will receive #basic.ack{} and %% #basic.nack{} commands. register_confirm_handler(Channel, ConfirmHandler) -> gen_server:cast(Channel, {register_confirm_handler, ConfirmHandler} ). %% @spec (Channel) -> ok %% where %% Channel = pid() %% @doc Removes the confirm handler, if it exists. Does nothing if there is no %% such handler. unregister_confirm_handler(Channel) -> gen_server:cast(Channel, unregister_confirm_handler). %% @spec (Channel, FlowHandler) -> ok %% where %% Channel = pid() %% FlowHandler = pid() %% @doc This registers a handler to deal with channel flow notifications. %% The registered process will receive #channel.flow{} records. register_flow_handler(Channel, FlowHandler) -> gen_server:cast(Channel, {register_flow_handler, FlowHandler} ). %% @spec (Channel) -> ok %% where %% Channel = pid() %% @doc Removes the flow handler, if it exists. Does nothing if there is no %% such handler. unregister_flow_handler(Channel) -> gen_server:cast(Channel, unregister_flow_handler). %% @spec (Channel, Msg) -> ok %% where %% Channel = pid() %% Msg = any() %% @doc This causes the channel to invoke Consumer:handle_call/2, %% where Consumer is the amqp_gen_consumer implementation registered with %% the channel. call_consumer(Channel, Msg) -> gen_server:call(Channel, {call_consumer, Msg}, amqp_util:call_timeout()). %% @spec (Channel, BasicConsume, Subscriber) -> ok %% where %% Channel = pid() %% BasicConsume = amqp_method() %% Subscriber = pid() %% @doc Subscribe the given pid to a queue using the specified %% basic.consume method. subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) -> gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, amqp_util:call_timeout()). %%--------------------------------------------------------------------------- %% Internal interface %%--------------------------------------------------------------------------- %% @private start_link(Driver, Connection, ChannelNumber, Consumer, Identity) -> gen_server:start_link( ?MODULE, [Driver, Connection, ChannelNumber, Consumer, Identity], []). set_writer(Pid, Writer) -> gen_server:cast(Pid, {set_writer, Writer}). enable_delivery_flow_control(Pid) -> gen_server:cast(Pid, enable_delivery_flow_control). notify_received({Pid, QPid, ServerChPid}) -> gen_server:cast(Pid, {send_notify, {QPid, ServerChPid}}). %% @private connection_closing(Pid, ChannelCloseType, Reason) -> gen_server:cast(Pid, {connection_closing, ChannelCloseType, Reason}). %% @private open(Pid) -> gen_server:call(Pid, open, amqp_util:call_timeout()). %%--------------------------------------------------------------------------- %% gen_server callbacks %%--------------------------------------------------------------------------- %% @private init([Driver, Connection, ChannelNumber, Consumer, Identity]) -> ?store_proc_name(Identity), {ok, #state{connection = Connection, driver = Driver, number = ChannelNumber, consumer = Consumer}}. %% @private handle_call(open, From, State) -> {noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)}; %% @private handle_call({close, Code, Text}, From, State) -> handle_close(Code, Text, From, State); %% @private handle_call({call, Method, AmqpMsg, Sender}, From, State) -> handle_method_to_server(Method, AmqpMsg, From, Sender, noflow, State); %% Handles the delivery of messages from a direct channel %% @private handle_call({send_command_sync, Method, Content}, From, State) -> Ret = handle_method_from_server(Method, Content, State), gen_server:reply(From, ok), Ret; %% Handles the delivery of messages from a direct channel %% @private handle_call({send_command_sync, Method}, From, State) -> Ret = handle_method_from_server(Method, none, State), gen_server:reply(From, ok), Ret; %% @private handle_call(next_publish_seqno, _From, State = #state{next_pub_seqno = SeqNo}) -> {reply, SeqNo, State}; handle_call({wait_for_confirms, Timeout}, From, State) -> handle_wait_for_confirms(From, Timeout, State); %% @private handle_call({call_consumer, Msg}, _From, State = #state{consumer = Consumer}) -> {reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State}; %% @private handle_call({subscribe, BasicConsume, Subscriber}, From, State) -> handle_method_to_server(BasicConsume, none, From, Subscriber, noflow, State). %% @private handle_cast({set_writer, Writer}, State = #state{driver = direct}) -> link(Writer), {noreply, State#state{writer = Writer}}; handle_cast({set_writer, Writer}, State) -> {noreply, State#state{writer = Writer}}; %% @private handle_cast(enable_delivery_flow_control, State) -> {noreply, State#state{delivery_flow_control = true}}; %% @private handle_cast({send_notify, {QPid, ChPid}}, State) -> rabbit_amqqueue_common:notify_sent(QPid, ChPid), {noreply, State}; %% @private handle_cast({cast, Method, AmqpMsg, Sender, noflow}, State) -> handle_method_to_server(Method, AmqpMsg, none, Sender, noflow, State); handle_cast({cast, Method, AmqpMsg, Sender, flow}, State) -> credit_flow:ack(Sender), handle_method_to_server(Method, AmqpMsg, none, Sender, flow, State); %% @private handle_cast({register_return_handler, ReturnHandler}, State) -> Ref = erlang:monitor(process, ReturnHandler), {noreply, State#state{return_handler = {ReturnHandler, Ref}}}; %% @private handle_cast(unregister_return_handler, State = #state{return_handler = {_ReturnHandler, Ref}}) -> erlang:demonitor(Ref), {noreply, State#state{return_handler = none}}; %% @private handle_cast({register_confirm_handler, ConfirmHandler}, State) -> Ref = erlang:monitor(process, ConfirmHandler), {noreply, State#state{confirm_handler = {ConfirmHandler, Ref}}}; %% @private handle_cast(unregister_confirm_handler, State = #state{confirm_handler = {_ConfirmHandler, Ref}}) -> erlang:demonitor(Ref), {noreply, State#state{confirm_handler = none}}; %% @private handle_cast({register_flow_handler, FlowHandler}, State) -> Ref = erlang:monitor(process, FlowHandler), {noreply, State#state{flow_handler = {FlowHandler, Ref}}}; %% @private handle_cast(unregister_flow_handler, State = #state{flow_handler = {_FlowHandler, Ref}}) -> erlang:demonitor(Ref), {noreply, State#state{flow_handler = none}}; %% Received from channels manager %% @private handle_cast({method, Method, Content, noflow}, State) -> handle_method_from_server(Method, Content, State); %% Handles the situation when the connection closes without closing the channel %% beforehand. The channel must block all further RPCs, %% flush the RPC queue (optional), and terminate %% @private handle_cast({connection_closing, CloseType, Reason}, State) -> handle_connection_closing(CloseType, Reason, State); %% @private handle_cast({shutdown, Shutdown}, State) -> handle_shutdown(Shutdown, State). %% Received from rabbit_channel in the direct case %% @private handle_info({send_command, Method}, State) -> handle_method_from_server(Method, none, State); %% Received from rabbit_channel in the direct case %% @private handle_info({send_command, Method, Content}, State) -> handle_method_from_server(Method, Content, State); %% Received from rabbit_channel in the direct case %% @private handle_info({send_command_and_notify, QPid, ChPid, Method = #'basic.deliver'{}, Content}, State = #state{delivery_flow_control = MFC}) -> case MFC of false -> handle_method_from_server(Method, Content, State), rabbit_amqqueue_common:notify_sent(QPid, ChPid); true -> handle_method_from_server(Method, Content, {self(), QPid, ChPid}, State) end, {noreply, State}; %% This comes from the writer or rabbit_channel %% @private handle_info({channel_exit, _ChNumber, Reason}, State) -> handle_channel_exit(Reason, State); %% This comes from rabbit_channel in the direct case handle_info({channel_closing, ChPid}, State) -> ok = rabbit_channel_common:ready_for_close(ChPid), {noreply, State}; %% @private handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), {noreply, State}; %% @private handle_info(timed_out_flushing_channel, State) -> ?LOG_WARN("Channel (~p) closing: timed out flushing while " "connection closing", [self()]), {stop, timed_out_flushing_channel, State}; %% @private handle_info({'DOWN', _, process, ReturnHandler, shutdown}, State = #state{return_handler = {ReturnHandler, _Ref}}) -> {noreply, State#state{return_handler = none}}; handle_info({'DOWN', _, process, ReturnHandler, Reason}, State = #state{return_handler = {ReturnHandler, _Ref}}) -> ?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. " "Reason: ~p", [self(), ReturnHandler, Reason]), {noreply, State#state{return_handler = none}}; %% @private handle_info({'DOWN', _, process, ConfirmHandler, shutdown}, State = #state{confirm_handler = {ConfirmHandler, _Ref}}) -> {noreply, State#state{confirm_handler = none}}; handle_info({'DOWN', _, process, ConfirmHandler, Reason}, State = #state{confirm_handler = {ConfirmHandler, _Ref}}) -> ?LOG_WARN("Channel (~p): Unregistering confirm handler ~p because it died. " "Reason: ~p", [self(), ConfirmHandler, Reason]), {noreply, State#state{confirm_handler = none}}; %% @private handle_info({'DOWN', _, process, FlowHandler, shutdown}, State = #state{flow_handler = {FlowHandler, _Ref}}) -> {noreply, State#state{flow_handler = none}}; handle_info({'DOWN', _, process, FlowHandler, Reason}, State = #state{flow_handler = {FlowHandler, _Ref}}) -> ?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. " "Reason: ~p", [self(), FlowHandler, Reason]), {noreply, State#state{flow_handler = none}}; handle_info({'DOWN', _, process, QPid, _Reason}, State) -> rabbit_amqqueue_common:notify_sent_queue_down(QPid), {noreply, State}; handle_info({confirm_timeout, From}, State = #state{waiting_set = WSet}) -> case gb_trees:lookup(From, WSet) of none -> {noreply, State}; {value, _} -> gen_server:reply(From, timeout), {noreply, State#state{waiting_set = gb_trees:delete(From, WSet)}} end. %% @private terminate(_Reason, State) -> flush_writer(State), State. %% @private code_change(_OldVsn, State, _Extra) -> {ok, State}. %%--------------------------------------------------------------------------- %% RPC mechanism %%--------------------------------------------------------------------------- handle_method_to_server(Method, AmqpMsg, From, Sender, Flow, State = #state{unconfirmed_set = USet}) -> case {check_invalid_method(Method), From, check_block(Method, AmqpMsg, State)} of {ok, _, ok} -> State1 = case {Method, State#state.next_pub_seqno} of {#'confirm.select'{}, 0} -> %% The confirm seqno is set to 1 on the %% first confirm.select only. State#state{next_pub_seqno = 1}; {#'basic.publish'{}, 0} -> State; {#'basic.publish'{}, SeqNo} -> State#state{unconfirmed_set = gb_sets:add(SeqNo, USet), next_pub_seqno = SeqNo + 1}; _ -> State end, {noreply, rpc_top_half(Method, build_content(AmqpMsg), From, Sender, Flow, State1)}; {ok, none, BlockReply} -> ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n" "Reason: ~p", [self(), Method, BlockReply]), {noreply, State}; {ok, _, BlockReply} -> {reply, BlockReply, State}; {{_, InvalidMethodMessage}, none, _} -> ?LOG_WARN("Channel (~p): ignoring cast of ~p method. " ++ InvalidMethodMessage ++ "", [self(), Method]), {noreply, State}; {{InvalidMethodReply, _}, _, _} -> {reply, {error, InvalidMethodReply}, State} end. handle_close(Code, Text, From, State) -> Close = #'channel.close'{reply_code = Code, reply_text = Text, class_id = 0, method_id = 0}, case check_block(Close, none, State) of ok -> {noreply, rpc_top_half(Close, none, From, none, noflow, State)}; BlockReply -> {reply, BlockReply, State} end. rpc_top_half(Method, Content, From, Sender, Flow, State0 = #state{rpc_requests = RequestQueue}) -> State1 = State0#state{ rpc_requests = queue:in({From, Sender, Method, Content, Flow}, RequestQueue)}, IsFirstElement = queue:is_empty(RequestQueue), if IsFirstElement -> do_rpc(State1); true -> State1 end. rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) -> {{value, {From, _Sender, _Method, _Content, _Flow}}, RequestQueue1} = queue:out(RequestQueue), case From of none -> ok; _ -> gen_server:reply(From, Reply) end, do_rpc(State#state{rpc_requests = RequestQueue1}). do_rpc(State = #state{rpc_requests = Q, closing = Closing}) -> case queue:out(Q) of {{value, {From, Sender, Method, Content, Flow}}, NewQ} -> State1 = pre_do(Method, Content, Sender, State), DoRet = do(Method, Content, Flow, State1), case ?PROTOCOL:is_method_synchronous(Method) of true -> State1; false -> case {From, DoRet} of {none, _} -> ok; {_, ok} -> gen_server:reply(From, ok); _ -> ok %% Do not reply if error in do. Expecting %% {channel_exit, _, _} end, do_rpc(State1#state{rpc_requests = NewQ}) end; {empty, NewQ} -> case Closing of {connection, Reason} -> gen_server:cast(self(), {shutdown, {connection_closing, Reason}}); _ -> ok end, State#state{rpc_requests = NewQ} end. pending_rpc_method(#state{rpc_requests = Q}) -> {value, {_From, _Sender, Method, _Content, _Flow}} = queue:peek(Q), Method. pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none, _Sender, State) -> State#state{closing = {just_channel, {app_initiated_close, Code, Text}}}; pre_do(#'basic.consume'{} = Method, none, Sender, State) -> ok = call_to_consumer(Method, Sender, State), State; pre_do(#'basic.cancel'{} = Method, none, Sender, State) -> ok = call_to_consumer(Method, Sender, State), State; pre_do(_, _, _, State) -> State. %%--------------------------------------------------------------------------- %% Handling of methods from the server %%--------------------------------------------------------------------------- safely_handle_method_from_server(Method, Content, Continuation, State = #state{closing = Closing}) -> case is_connection_method(Method) of true -> server_misbehaved( #amqp_error{name = command_invalid, explanation = "connection method on " "non-zero channel", method = element(1, Method)}, State); false -> Drop = case {Closing, Method} of {{just_channel, _}, #'channel.close'{}} -> false; {{just_channel, _}, #'channel.close_ok'{}} -> false; {{just_channel, _}, _} -> true; _ -> false end, if Drop -> ?LOG_INFO("Channel (~p): dropping method ~p from " "server because channel is closing", [self(), {Method, Content}]), {noreply, State}; true -> Continuation() end end. handle_method_from_server(Method, Content, State) -> Fun = fun () -> handle_method_from_server1(Method, amqp_msg(Content), State) end, safely_handle_method_from_server(Method, Content, Fun, State). handle_method_from_server(Method = #'basic.deliver'{}, Content, DeliveryCtx, State) -> Fun = fun () -> handle_method_from_server1(Method, amqp_msg(Content), DeliveryCtx, State) end, safely_handle_method_from_server(Method, Content, Fun, State). handle_method_from_server1(#'channel.open_ok'{}, none, State) -> {noreply, rpc_bottom_half(ok, State)}; handle_method_from_server1(#'channel.close'{reply_code = Code, reply_text = Text}, none, State = #state{closing = {just_channel, _}}) -> %% Both client and server sent close at the same time. Don't shutdown yet, %% wait for close_ok. do(#'channel.close_ok'{}, none, noflow, State), {noreply, State#state{ closing = {just_channel, {server_initiated_close, Code, Text}}}}; handle_method_from_server1(#'channel.close'{reply_code = Code, reply_text = Text}, none, State) -> do(#'channel.close_ok'{}, none, noflow, State), handle_shutdown({server_initiated_close, Code, Text}, State); handle_method_from_server1(#'channel.close_ok'{}, none, State = #state{closing = Closing}) -> case Closing of {just_channel, {app_initiated_close, _, _} = Reason} -> handle_shutdown(Reason, rpc_bottom_half(ok, State)); {just_channel, {server_initiated_close, _, _} = Reason} -> handle_shutdown(Reason, rpc_bottom_half(closing, State)); {connection, Reason} -> handle_shutdown({connection_closing, Reason}, State) end; handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) -> Consume = #'basic.consume'{} = pending_rpc_method(State), ok = call_to_consumer(ConsumeOk, Consume, State), {noreply, rpc_bottom_half(ConsumeOk, State)}; handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) -> Cancel = #'basic.cancel'{} = pending_rpc_method(State), ok = call_to_consumer(CancelOk, Cancel, State), {noreply, rpc_bottom_half(CancelOk, State)}; handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) -> ok = call_to_consumer(Cancel, none, State), {noreply, State}; handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) -> ok = call_to_consumer(Deliver, AmqpMsg, State), {noreply, State}; handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none, State = #state{flow_handler = FlowHandler}) -> case FlowHandler of none -> ok; {Pid, _Ref} -> Pid ! Flow end, %% Putting the flow_ok in the queue so that the RPC queue can be %% flushed beforehand. Methods that made it to the queue are not %% blocked in any circumstance. {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none, none, noflow, State#state{flow_active = Active})}; handle_method_from_server1( #'basic.return'{} = BasicReturn, AmqpMsg, State = #state{return_handler = ReturnHandler}) -> case ReturnHandler of none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is " "no return handler registered", [self(), BasicReturn, AmqpMsg]); {Pid, _Ref} -> Pid ! {BasicReturn, AmqpMsg} end, {noreply, State}; handle_method_from_server1(#'basic.ack'{} = BasicAck, none, #state{confirm_handler = none} = State) -> {noreply, update_confirm_set(BasicAck, State)}; handle_method_from_server1(#'basic.ack'{} = BasicAck, none, #state{confirm_handler = {CH, _Ref}} = State) -> CH ! BasicAck, {noreply, update_confirm_set(BasicAck, State)}; handle_method_from_server1(#'basic.nack'{} = BasicNack, none, #state{confirm_handler = none} = State) -> ?LOG_WARN("Channel (~p): received ~p but there is no " "confirm handler registered", [self(), BasicNack]), {noreply, update_confirm_set(BasicNack, State)}; handle_method_from_server1(#'basic.nack'{} = BasicNack, none, #state{confirm_handler = {CH, _Ref}} = State) -> CH ! BasicNack, {noreply, update_confirm_set(BasicNack, State)}; handle_method_from_server1(#'basic.credit_drained'{} = CreditDrained, none, #state{consumer = Consumer} = State) -> Consumer ! CreditDrained, {noreply, State}; handle_method_from_server1(Method, none, State) -> {noreply, rpc_bottom_half(Method, State)}; handle_method_from_server1(Method, Content, State) -> {noreply, rpc_bottom_half({Method, Content}, State)}. %% only used with manual consumer-to-queue flow control handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, DeliveryCtx, State) -> ok = call_to_consumer(Deliver, AmqpMsg, DeliveryCtx, State), {noreply, State}. %%--------------------------------------------------------------------------- %% Other handle_* functions %%--------------------------------------------------------------------------- handle_connection_closing(CloseType, Reason, State = #state{rpc_requests = RpcQueue, closing = Closing}) -> NewState = State#state{closing = {connection, Reason}}, case {CloseType, Closing, queue:is_empty(RpcQueue)} of {flush, false, false} -> erlang:send_after(?TIMEOUT_FLUSH, self(), timed_out_flushing_channel), {noreply, NewState}; {flush, {just_channel, _}, false} -> {noreply, NewState}; _ -> handle_shutdown({connection_closing, Reason}, NewState) end. handle_channel_exit(Reason = #amqp_error{name = ErrorName, explanation = Expl}, State = #state{connection = Connection, number = Number}) -> %% Sent by rabbit_channel for hard errors in the direct case ?LOG_ERR("connection ~p, channel ~p - error:~n~p", [Connection, Number, Reason]), {true, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName), ReportedReason = {server_initiated_close, Code, Expl}, amqp_gen_connection:hard_error_in_channel( Connection, self(), ReportedReason), handle_shutdown({connection_closing, ReportedReason}, State); handle_channel_exit(Reason, State) -> %% Unexpected death of a channel infrastructure process {stop, {infrastructure_died, Reason}, State}. handle_shutdown({_, 200, _}, State) -> {stop, normal, State}; handle_shutdown({connection_closing, {_, 200, _}}, State) -> {stop, normal, State}; handle_shutdown({connection_closing, normal}, State) -> {stop, normal, State}; handle_shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. %%--------------------------------------------------------------------------- %% Internal plumbing %%--------------------------------------------------------------------------- do(Method, Content, Flow, #state{driver = network, writer = W}) -> %% Catching because it expects the {channel_exit, _, _} message on error catch case {Content, Flow} of {none, _} -> rabbit_writer:send_command(W, Method); {_, flow} -> rabbit_writer:send_command_flow(W, Method, Content); {_, noflow} -> rabbit_writer:send_command(W, Method, Content) end; do(Method, Content, Flow, #state{driver = direct, writer = W}) -> %% ditto catching because... catch case {Content, Flow} of {none, _} -> rabbit_channel_common:do(W, Method); {_, flow} -> rabbit_channel_common:do_flow(W, Method, Content); {_, noflow} -> rabbit_channel_common:do(W, Method, Content) end. flush_writer(#state{driver = network, writer = Writer}) -> try rabbit_writer:flush(Writer) catch exit:noproc -> ok end; flush_writer(#state{driver = direct}) -> ok. amqp_msg(none) -> none; amqp_msg(Content) -> {Props, Payload} = rabbit_basic_common:from_content(Content), #amqp_msg{props = Props, payload = Payload}. build_content(none) -> none; build_content(#amqp_msg{props = Props, payload = Payload}) -> rabbit_basic_common:build_content(Props, Payload). check_block(_Method, _AmqpMsg, #state{closing = {just_channel, _}}) -> closing; check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) -> closing; check_block(_Method, none, #state{}) -> ok; check_block(_Method, #amqp_msg{}, #state{flow_active = false}) -> blocked; check_block(_Method, _AmqpMsg, #state{}) -> ok. check_invalid_method(#'channel.open'{}) -> {use_amqp_connection_module, "Use amqp_connection:open_channel/{1,2} instead"}; check_invalid_method(#'channel.close'{}) -> {use_close_function, "Use close/{1,3} instead"}; check_invalid_method(Method) -> case is_connection_method(Method) of true -> {connection_methods_not_allowed, "Sending connection methods is not allowed"}; false -> ok end. is_connection_method(Method) -> {ClassId, _} = ?PROTOCOL:method_id(element(1, Method)), ?PROTOCOL:lookup_class_name(ClassId) == connection. server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) -> case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of {0, _} -> handle_shutdown({server_misbehaved, AmqpError}, State); {_, Close} -> ?LOG_WARN("Channel (~p) flushing and closing due to soft " "error caused by the server ~p", [self(), AmqpError]), Self = self(), spawn(fun () -> call(Self, Close) end), {noreply, State} end. update_confirm_set(#'basic.ack'{delivery_tag = SeqNo, multiple = Multiple}, State = #state{unconfirmed_set = USet}) -> maybe_notify_waiters( State#state{unconfirmed_set = update_unconfirmed(SeqNo, Multiple, USet)}); update_confirm_set(#'basic.nack'{delivery_tag = SeqNo, multiple = Multiple}, State = #state{unconfirmed_set = USet}) -> maybe_notify_waiters( State#state{unconfirmed_set = update_unconfirmed(SeqNo, Multiple, USet), only_acks_received = false}). update_unconfirmed(SeqNo, false, USet) -> gb_sets:del_element(SeqNo, USet); update_unconfirmed(SeqNo, true, USet) -> case gb_sets:is_empty(USet) of true -> USet; false -> {S, USet1} = gb_sets:take_smallest(USet), case S > SeqNo of true -> USet; false -> update_unconfirmed(SeqNo, true, USet1) end end. maybe_notify_waiters(State = #state{unconfirmed_set = USet}) -> case gb_sets:is_empty(USet) of false -> State; true -> notify_confirm_waiters(State) end. notify_confirm_waiters(State = #state{waiting_set = WSet, only_acks_received = OAR}) -> [begin safe_cancel_timer(TRef), gen_server:reply(From, OAR) end || {From, TRef} <- gb_trees:to_list(WSet)], State#state{waiting_set = gb_trees:empty(), only_acks_received = true}. do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) -> case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of {error, Reason} -> throw(Reason); Other -> Other end. handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) -> {reply, {error, not_in_confirm_mode}, State}; handle_wait_for_confirms(From, Timeout, State = #state{unconfirmed_set = USet, waiting_set = WSet}) -> case gb_sets:is_empty(USet) of true -> {reply, true, State}; false -> TRef = case Timeout of infinity -> undefined; _ -> erlang:send_after( Timeout, self(), {confirm_timeout, From}) end, {noreply, State#state{waiting_set = gb_trees:insert(From, TRef, WSet)}} end. call_to_consumer(Method, Args, #state{consumer = Consumer}) -> amqp_gen_consumer:call_consumer(Consumer, Method, Args). call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) -> amqp_gen_consumer:call_consumer(Consumer, Method, Args, DeliveryCtx). safe_cancel_timer(undefined) -> ok; safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef). second_to_millisecond(Timeout) -> Timeout * 1000.