Skip to content

Commit

Permalink
fix issue #10 - Should send message to client on (re-)connect or hand…
Browse files Browse the repository at this point in the history
…le re-subscribes
  • Loading branch information
Feng Lee committed May 18, 2015
1 parent 340d9d6 commit 48569d0
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 29 deletions.
6 changes: 6 additions & 0 deletions examples/gen_server/run
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh
# -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et

erl -pa ebin -pa ../../ebin -pa ../../deps/*/ebin -smp true -s gen_server_example start_link

14 changes: 14 additions & 0 deletions examples/gen_server/src/gen_server_example.app.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{application, gen_server_example,
[
{description, "emqttc gen_server example"},
{vsn, "1.0"},
{modules, [
gen_server_example
]},
{registered, []},
{applications, [
kernel,
stdlib
]},
{env, []}
]}.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
-module(gen_server_example).

-behaviour(gen_server).

-define(SERVER, ?MODULE).

%% ------------------------------------------------------------------
Expand Down Expand Up @@ -34,9 +36,10 @@ stop() ->
init(_Args) ->
{ok, C} = emqttc:start_link([{host, "localhost"},
{client_id, <<"simpleClient">>},
{logger, info}]),
{reconnect, 3},
{logger, {console, info}}]),
%% the pending subscribe
emqttc:subscribe(C, <<"TopicA">>, 1),
self() ! publish,
{ok, #state{mqttc = C, seq = 1}}.

handle_call(stop, _From, State) ->
Expand All @@ -48,17 +51,32 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

%% Publish Messages
handle_info(publish, State = #state{mqttc = C, seq = I}) ->
Payload = list_to_binary(["hello...", integer_to_list(I)]),
emqttc:publish(C, <<"TopicA">>, Payload, [{qos, 1}]),
erlang:send_after(1000, self(), publish),
emqttc:publish(C, <<"TopicB">>, Payload, [{qos, 2}]),
erlang:send_after(3000, self(), publish),
{noreply, State#state{seq = I+1}};

%% Receive
%% Receive Messages
handle_info({publish, Topic, Payload}, State) ->
io:format("Message from ~s: ~p~n", [Topic, Payload]),
{noreply, State};

%% Client connected
handle_info({mqttc, C, connected}, State = #state{mqttc = C}) ->
io:format("Client ~p is connected~n", [C]),
emqttc:subscribe(C, <<"TopicA">>, 1),
emqttc:subscribe(C, <<"TopicB">>, 2),
self() ! publish,
{noreply, State};

%% Client disconnected
handle_info({mqttc, C, disconnected}, State = #state{mqttc = C}) ->
io:format("Client ~p is disconnected~n", [C]),
{noreply, State};

handle_info(_Info, State) ->
{noreply, State}.

Expand Down
88 changes: 63 additions & 25 deletions src/emqttc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
code_change/4]).

%% fsm state
-export([connecting/2, connecting/3,
-export([connecting/2,
waiting_for_connack/2, waiting_for_connack/3,
connected/2, connected/3,
disconnected/2, disconnected/3]).
Expand All @@ -72,13 +72,15 @@
| {username, binary()}
| {password, binary()}
| {will, list(tuple())}
| {connack_timeout, pos_integer()}
| ssl
| {logger, atom() | {atom(), atom()}}
| {reconnect, non_neg_integer() | {non_neg_integer(), non_neg_integer()} | false}.

-type mqtt_pubopt() :: {qos, mqtt_qos()} | {retain, boolean()}.

-record(state, {
parent :: pid(),
name :: atom(),
host = "localhost" :: inet:ip_address() | string(),
port = 1883 :: inet:port_number(),
Expand All @@ -91,10 +93,15 @@
pending_pubsub = [] :: list(),
keepalive :: emqttc_keepalive:keepalive() | undefined,
keepalive_time = 60 :: non_neg_integer(),
connack_timeout :: pos_integer(),
connack_tref :: reference(),
transport = tcp :: tcp | ssl,
reconnector :: emqttc_reconnector:reconnector() | undefined,
logger :: gen_logger:logmod()}).

%% seconds
-define(CONNACK_TIMEOUT, 30).

-define(KEEPALIVE_EVENT, {keepalive, timeout}).

-define(RECONNECT_EVENT, {reconnect, timeout}).
Expand Down Expand Up @@ -127,7 +134,7 @@ start_link() ->
MqttOpts :: [mqttc_opt()],
Client :: pid().
start_link(MqttOpts) when is_list(MqttOpts) ->
gen_fsm:start_link(?MODULE, [undefined, MqttOpts], []).
gen_fsm:start_link(?MODULE, [undefined, self(), MqttOpts], []).

%%------------------------------------------------------------------------------
%% @doc Start emqttc client with name, options.
Expand All @@ -137,7 +144,7 @@ start_link(MqttOpts) when is_list(MqttOpts) ->
Name :: atom(),
MqttOpts :: [mqttc_opt()].
start_link(Name, MqttOpts) when is_atom(Name), is_list(MqttOpts) ->
gen_fsm:start_link({local, Name}, ?MODULE, [Name, MqttOpts], []).
gen_fsm:start_link({local, Name}, ?MODULE, [Name, self(), MqttOpts], []).

%%------------------------------------------------------------------------------
%% @doc Publish message to broker with QoS0.
Expand Down Expand Up @@ -249,10 +256,10 @@ disconnect(Client) ->
{ok, StateName :: atom(), StateData :: #state{}} |
{ok, StateName :: atom(), StateData :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([undefined, MqttOpts]) ->
init([pid_to_list(self()), MqttOpts]);
init([undefined, Parent, MqttOpts]) ->
init([pid_to_list(self()), Parent, MqttOpts]);

init([Name, MqttOpts]) ->
init([Name, Parent, MqttOpts]) ->

process_flag(trap_exit, true),

Expand All @@ -270,11 +277,13 @@ init([Name, MqttOpts]) ->
IsSSL = get_value(ssl, MqttOpts1, false),

State = init(MqttOpts1, #state{
parent = Parent,
name = Name,
host = "localhost",
port = if IsSSL -> 8883;
true -> 1883 end,
proto_state = ProtoState,
connack_timeout = ?CONNACK_TIMEOUT,
logger = Logger}),

{ok, connecting, State, 0}.
Expand All @@ -292,6 +301,8 @@ init([{logger, Cfg} | Opts], State) ->
init(Opts, State#state{logger = gen_logger:new(Cfg)});
init([{keepalive, Time} | Opts], State) ->
init(Opts, State#state{keepalive_time = Time});
init([{connack_timeout, Timeout}| Opts], State) ->
init(Opts, State#state{connack_timeout = Timeout});
init([{reconnect, ReconnOpt} | Opts], State) ->
init(Opts, State#state{reconnector = init_reconnector(ReconnOpt)});
init([_Opt | Opts], State) ->
Expand All @@ -308,39 +319,43 @@ init_reconnector(Params) when is_integer(Params) orelse is_tuple(Params) ->
%% @end
%%------------------------------------------------------------------------------
connecting(timeout, State) ->
connect(State);

connecting(Event, State = #state{name = Name, logger = Logger}) ->
Logger:warning("[Client ~s] Unexpected Event: ~p, when connecting.", [Name, Event]),
{next_state, connecting, State}.

%%------------------------------------------------------------------------------
%% @private
%% @doc Sync event Handler for state that connecting to MQTT broker.
%% @end
%%------------------------------------------------------------------------------
connecting(Event, From, State = #state{name = Name, logger = Logger}) ->
Logger:warning("[Client ~s] Unexpected Sync Event from ~p when connecting: ~p", [Name, From, Event]),
{reply, {error, connecting}, connecting, State}.
connect(State).

%%------------------------------------------------------------------------------
%% @private
%% @doc Event Handler for state that waiting_for_connack from MQTT broker.
%% @end
%%------------------------------------------------------------------------------
waiting_for_connack(?CONNACK_PACKET(?CONNACK_ACCEPT), State = #state{
parent = Parent,
name = Name,
pending_pubsub = Pending,
proto_state = ProtoState,
keepalive = KeepAlive,
connack_tref = TRef,
logger = Logger}) ->
Logger:info("[Client ~s] RECV: CONNACK_ACCEPT", [Name]),

%% cancel connack timer
if
TRef =:= undefined -> ok;
true -> gen_fsm:cancel_timer(TRef)
end,

{ok, ProtoState1} = emqttc_protocol:received('CONNACK', ProtoState),

%% send the pending pubsub
[gen_fsm:send_event(self(), Event) || Event <- lists:reverse(Pending)],

%% start keepalive
KeepAlive1 = emqttc_keepalive:start(KeepAlive),
{next_state, connected, State#state{proto_state = ProtoState1,
keepalive = KeepAlive1,

%% tell parent to subscribe
Parent ! {mqttc, self(), connected},

{next_state, connected, State#state{proto_state = ProtoState1,
keepalive = KeepAlive1,
connack_tref = undefined,
pending_pubsub = []}};

waiting_for_connack(?CONNACK_PACKET(ReturnCode), State = #state{name = Name, logger = Logger}) ->
Expand All @@ -364,6 +379,10 @@ waiting_for_connack(disconnect, State=#state{receiver = Receiver, proto_state =
emqttc_socket:stop(Receiver),
{stop, normal, State#state{socket = undefined, receiver = undefined}};

waiting_for_connack({timeout, TRef, connack}, State = #state{name = Name, logger = Logger, connack_tref = TRef}) ->
Logger:error("[Client ~s] CONNACK Timeout!", [Name]),
{stop, {shutdown, connack_timeout}, State};

waiting_for_connack(Event, State = #state{name = Name, logger = Logger}) ->
Logger:warning("[Client ~s] Unexpected Event: ~p, when waiting for connack!", [Name, Event]),
{next_state, waiting_for_connack, State}.
Expand Down Expand Up @@ -505,6 +524,7 @@ disconnected(Event = {publish, _Msg}, State) ->

disconnected(Event = {Tag, _From, _Topics}, State) when
Tag =:= subscribe orelse Tag =:= unsubscribe ->
io:format("Pending event for client disconnected: ~p~n", [Event]),
{next_state, disconnected, pending(Event, State)};

disconnected(disconnect, State) ->
Expand Down Expand Up @@ -539,10 +559,25 @@ disconnected(Event, _From, State = #state{name = Name, logger = Logger}) ->
timeout() | hibernate} |
{stop, Reason :: term(), NewStateData :: #state{}}).

handle_event({connection_lost, Reason}, _StateName, State = #state{name = Name, keepalive = KeepAlive, logger = Logger}) ->
handle_event({connection_lost, Reason}, StateName, State = #state{parent = Parent, name = Name, keepalive = KeepAlive, connack_tref = TRef, logger = Logger})
when StateName =:= connected; StateName =:= waiting_for_connack ->

Logger:warning("[Client ~s] Connection lost for: ~p", [Name, Reason]),

%% cancel connack timer first, if connection lost when waiting for connack.
case {StateName, TRef} of
{waiting_for_connack, undefined} -> ok;
{waiting_for_connack, TRef} -> gen_fsm:cancel_timer(TRef);
_ -> ok
end,

%% cancel keepalive
emqttc_keepalive:cancel(KeepAlive),
try_reconnect(Reason, State#state{socket = undefined});

%% tell parent
Parent ! {mqttc, self(), disconnected},

try_reconnect(Reason, State#state{socket = undefined, connack_tref = TRef});

handle_event(Event, StateName, State = #state{name = Name, logger = Logger}) ->
Logger:warning("[Client ~s] Unexpected Event when ~s: ~p", [Name, StateName, Event]),
Expand Down Expand Up @@ -691,6 +726,7 @@ connect(State = #state{name = Name,
receiver = undefined,
proto_state = ProtoState,
keepalive_time = KeepAliveTime,
connack_timeout = ConnAckTimeout,
transport = Transport,
logger = Logger}) ->
Logger:info("[Client ~s]: connecting to ~s:~p", [Name, Host, Port]),
Expand All @@ -699,11 +735,13 @@ connect(State = #state{name = Name,
ProtoState1 = emqttc_protocol:set_socket(ProtoState, Socket),
emqttc_protocol:connect(ProtoState1),
KeepAlive = emqttc_keepalive:new({Socket, send_oct}, KeepAliveTime, ?KEEPALIVE_EVENT),
TRef = gen_fsm:start_timer(ConnAckTimeout * 1000, connack),
Logger:info("[Client ~s] connected with ~s:~p", [Name, Host, Port]),
{next_state, waiting_for_connack, State#state{socket = Socket,
receiver = Receiver,
keepalive = KeepAlive,
proto_state = ProtoState1} };
connack_tref = TRef,
proto_state = ProtoState1}};
{error, Reason} ->
Logger:info("[Client ~s] connection failure: ~p", [Name, Reason]),
try_reconnect(Reason, State)
Expand Down

0 comments on commit 48569d0

Please sign in to comment.