Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Transactions #166

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ before_script:
- make

script: "make test"

otp_release:
- 17.0-rc1
- 18.0
- 17.5
- R16B03-1
- R16B
- R15B03
Expand Down
15 changes: 14 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ APP_NAME=emysql
MODULES=$(shell ls -1 src/*.erl | awk -F[/.] '{ print $$2 }' | sed '$$q;s/$$/,/g')
MAKETIME=$(shell date)

## Check if we are on erlang version that has namespaced types
ERL_NT=$(shell escript ./support/ntype_check.escript)

## Check if we are on erlang version that has erlang:timestamp/0
ERL_TS=$(shell escript ./support/timestamp_check.escript)

ifeq ($(ERL_NT),true)
ERLC_NT_FLAG=-Dnamespaced_types
endif
ifeq ($(ERL_TS),true)
ERLC_TS_FLAG=-Dtimestamp_support
endif

all: crypto_compat app
(cd src;$(MAKE))

Expand Down Expand Up @@ -80,7 +93,7 @@ CT_RUN = ct_run \
CT_SUITES=environment basics conn_mgr

build-tests:
erlc -v -o test/ $(wildcard test/*.erl) -pa ebin/
erlc -v $(ERLC_NT_FLAG) $(ERLC_TS_FLAG) -o test/ $(wildcard test/*.erl) -pa ebin/

test: all build-tests
@mkdir -p logs
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,29 @@ If you are looking for the **plain necessities**, you should use the [ejabberd][

#### Transaction

This driver currently does not support transactions.
This driver supports transaction in this way:

Usage:

```erlang
Transaction = fun(Q) ->
R1 = Q(<<"SELECT * from some_table">>, []),
R2 = Q(<<"SELECT * from some_other_table">>, []),
R3 = Q(<"INSERT INTO ...">>, []),
....

{ok, SomeResult}
end,

{ok, Result} = emysql:transaction(Pool, Transaction, Timeout).

The transaction fun receive one argument, wich is also a function.
That is used to perform queries. Only textual queries allowed
(no prepared statments with arguments).

The result is the return value of the transaction function.
If the function crashes, transaction is aborted. That is the only
way to abort the transaction: throwing an error.

For **mnesia-style transactions**, one of the multiple '[erlang-mysql-driver][22]s' may suite you best. There are [quite many][16] branches of it out there, and they are based on the same project as the ejabberd driver. To learn more about out the differences between the drivers, see the [mysql driver history][History].

Expand Down
130 changes: 64 additions & 66 deletions include/emysql.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
%% Jacob Vorreuter <[email protected]>,
%% Henning Diedrich <[email protected]>,
%% Eonblast Corporation <http://www.eonblast.com>
%%
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"),to deal in the Software without restric-
%% tion, including without limitation the rights to use, copy,
%% tion, including without limitation the rights to use, copy,
%% modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
Expand All @@ -25,89 +25,88 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.


-record(pool, {pool_id :: atom(),
size :: number(),
user :: string(),
password :: string(),
host :: string(),
port :: number(),
database :: string(),
-record(pool, {pool_id :: atom(),
size :: number(),
user :: string(),
password :: string(),
host :: string(),
port :: number(),
database :: string(),
encoding :: utf8 | latin1 | {utf8, utf8_unicode_ci} | {utf8, utf8_general_ci},
available=queue:new() :: queue(),
locked=gb_trees:empty() :: gb_tree(),
waiting=queue:new() :: queue(),
start_cmds=[] :: string(),
conn_test_period=0 :: number(),
available=queue:new() :: emysql:t_queue(),
locked=gb_trees:empty() :: emysql:t_gb_tree(),
waiting=queue:new() :: emysql:t_queue(),
start_cmds=[] :: string(),
conn_test_period=0 :: number(),
connect_timeout=infinity :: number() | infinity,
warnings=false :: boolean()}).

-record(emysql_connection, {id :: string(),
pool_id :: atom(),
-record(emysql_connection, {id :: string(),
pool_id :: atom(),
encoding :: atom(), % maybe could be latin1 | utf8 ?
socket :: inet:socket(),
version :: number(),
thread_id :: number(),
caps :: number(),
language :: number,
prepared=gb_trees:empty(),
locked_at :: number(),
alive=true :: boolean(),
test_period=0 :: number(),
last_test_time=0 :: number(),
socket :: inet:socket(),
version :: number(),
thread_id :: number(),
caps :: number(),
language :: number,
prepared=gb_trees:empty(),
locked_at :: number(),
alive=true :: boolean(),
test_period=0 :: number(),
last_test_time=0 :: number(),
monitor_ref :: reference(),
warnings=false :: boolean()}).

-record(greeting, {protocol_version :: number(),
server_version :: binary(),
thread_id :: number(),
salt1 :: binary(),
salt2 :: binary(),
caps :: number(),
caps_high :: number(),
language :: number(),
status :: number(),
seq_num :: number(),
-record(greeting, {protocol_version :: number(),
server_version :: binary(),
thread_id :: number(),
salt1 :: binary(),
salt2 :: binary(),
caps :: number(),
caps_high :: number(),
language :: number(),
status :: number(),
seq_num :: number(),
plugin :: binary()}).

-record(field, {seq_num :: number(),
catalog :: binary(),
db :: binary(),
table :: binary(),
org_table :: binary(),
name :: binary(),
org_name :: binary(),
type :: number(),
default :: number(),
charset_nr :: number(),
length :: number(),
flags :: number(),
decimals :: number(),
-record(field, {seq_num :: number(),
catalog :: binary(),
db :: binary(),
table :: binary(),
org_table :: binary(),
name :: binary(),
org_name :: binary(),
type :: number(),
default :: number(),
charset_nr :: number(),
length :: number(),
flags :: number(),
decimals :: number(),
decoder :: fun()}).
-record(packet, {size :: number(),
seq_num :: number(),
-record(packet, {size :: number(),
seq_num :: number(),
data :: binary()}).
-record(ok_packet, {seq_num :: number(),
affected_rows :: number(),
insert_id :: number(),
status :: number(),
warning_count :: number(),
-record(ok_packet, {seq_num :: number(),
affected_rows :: number(),
insert_id :: number(),
status :: number(),
warning_count :: number(),
msg :: string()
| {error, string(), unicode:latin1_chardata() | unicode:chardata() | unicode:external_chardata()}
| {incomplete, string(), binary()}}).

% It's unfortunate that error_packet's status is binary when the status of other
% packets is a number.
-record(error_packet, {seq_num :: number(),
code :: number(),
status :: binary(),
-record(error_packet, {seq_num :: number(),
code :: number(),
status :: binary(),
msg :: [byte()]}).

-record(eof_packet, {seq_num :: number(),
status :: number(),
-record(eof_packet, {seq_num :: number(),
status :: number(),
warning_count :: number()}). % extended to mySQL 4.1+ format

-record(result_packet, {seq_num :: number(),
-record(result_packet, {seq_num :: number(),
field_list :: list(),
rows, extra}).

Expand Down Expand Up @@ -179,4 +178,3 @@
% we discovered that the new statement returns a different
% number of result set columns.
-define(SERVER_STATUS_METADATA_CHANGED, 1024).

2 changes: 2 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
% -*- Erlang -*-
% vim: ts=4 sw=4 et ft=erlang
{erl_opts, [
{platform_define, "^[0-9]+", namespaced_types},
nowarn_deprecated_type
]}.
{pre_hooks,[

{"linux|bsd|darwin|solaris", compile, "escript ./support/crypto_compat.escript"},
{"win32", compile, "escript.exe support/crypto_compat.escript"}
]}.
14 changes: 14 additions & 0 deletions rebar.config.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{exports, ExportList} = lists:keyfind(exports,1,erlang:module_info()),
Check = lists:member({timestamp,0},ExportList),
case Check of
true ->
case lists:keyfind(erl_opts, 1, CONFIG) of
false ->
CONFIG ++ [{erl_opts,[{d,timestamp_support}]}];
{erl_opts, Opts} ->
NewOpts = {erl_opts, Opts ++ [{d,timestamp_support}]},
lists:keyreplace(erl_opts, 1, CONFIG, NewOpts)
end;
false ->
CONFIG
end.
53 changes: 42 additions & 11 deletions src/emysql.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@
add_pool/9,
add_pool/8, remove_pool/1, increment_pool_size/2, decrement_pool_size/2
]).

%% Interaction API
%% Used to interact with the database.
%% Used to interact with the database.
-export([
prepare/2,
execute/2, execute/3, execute/4, execute/5,
execute/2, execute/3, execute/4, execute/5, transaction/3,
default_timeout/0
]).

Expand All @@ -136,6 +136,23 @@
% for record and constant defines
-include("emysql.hrl").

-export_type([
t_gb_tree/0,
t_queue/0,
t_dict/0
]).

-ifdef(namespaced_types).
-type t_gb_tree() :: gb_trees:tree().
-type t_queue() :: queue:queue().
-type t_dict() :: dict:dict().
-else.
-type t_gb_tree() :: gb_tree().
-type t_queue() :: queue().
-type t_dict() :: dict().
-endif.


%% @spec start() -> ok
%% @doc Start the Emysql application.
%%
Expand Down Expand Up @@ -246,8 +263,8 @@ config_ok(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,p
config_ok(_BadOptions) ->
erlang:error(badarg).

encoding_ok(Enc) when is_atom(Enc) -> ok;
encoding_ok({Enc, Coll}) when is_atom(Enc), is_atom(Coll) -> ok;
encoding_ok(Enc) when is_atom(Enc) -> ok;
encoding_ok({Enc, Coll}) when is_atom(Enc), is_atom(Coll) -> ok;
encoding_ok(_) -> erlang:error(badarg).

%% Creates a pool record, opens n=Size connections and calls
Expand All @@ -267,15 +284,15 @@ add_pool(PoolId, Options) when is_list(Options) ->
Warnings = proplists:get_value(warnings, Options, false),
add_pool(#pool{pool_id=PoolId,size=Size, user=User, password=Password,
host=Host, port=Port, database=Database,
encoding=Encoding, start_cmds=StartCmds,
encoding=Encoding, start_cmds=StartCmds,
connect_timeout=ConnectTimeout, warnings=Warnings}).

add_pool(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,port=Port,
database=Database,encoding=Encoding,start_cmds=StartCmds,
connect_timeout=ConnectTimeout,warnings=Warnings}=PoolSettings)->
config_ok(PoolSettings),
case emysql_conn_mgr:has_pool(PoolId) of
true ->
true ->
{error,pool_already_exists};
false ->
Pool = #pool{
Expand Down Expand Up @@ -328,8 +345,8 @@ add_pool(PoolId, Size, User, Password, Host, Port, Database, Encoding) ->
add_pool(PoolId, Size, User, Password, Host, Port, Database, Encoding, StartCmds) ->
add_pool(PoolId, Size, User, Password, Host, Port, Database, Encoding, StartCmds, infinity).

add_pool(PoolId, Size, User, Password, Host, Port, Database,
Encoding, StartCmds, ConnectTimeout)->
add_pool(PoolId, Size, User, Password, Host, Port, Database,
Encoding, StartCmds, ConnectTimeout)->
add_pool(PoolId,[{size,Size},{user,User},{password,Password},
{host,Host},{port,Port},{database,Database},
{encoding,Encoding},{start_cmds,StartCmds},
Expand Down Expand Up @@ -533,6 +550,11 @@ execute(PoolId, Query, Timeout) when (is_list(Query) orelse is_binary(Query)) an
execute(PoolId, StmtName, Timeout) when is_atom(StmtName), (is_integer(Timeout) orelse Timeout == infinity) ->
execute(PoolId, StmtName, [], Timeout).

transaction(PoolId, Fun, Timeout) when is_function(Fun) ->
Connection = emysql_conn_mgr:wait_for_connection(PoolId),
monitor_work(Connection, Timeout, {transaction, Connection, Fun}).


%% @spec execute(PoolId, Query|StmtName, Args, Timeout) -> Result | [Result]
%% PoolId = atom()
%% Query = binary() | string()
Expand Down Expand Up @@ -669,7 +691,7 @@ result_type(#eof_packet{}) -> eof.
-spec as_dict(Result) -> Dict
when
Result :: #result_packet{},
Dict :: dict().
Dict :: t_dict().
as_dict(Res) -> emysql_conv:as_dict(Res).


Expand Down Expand Up @@ -759,7 +781,16 @@ monitor_work(Connection0, Timeout, Args) when is_record(Connection0, emysql_conn
{Pid, Mref} = spawn_monitor(
fun() ->
put(query_arguments, Args),
Parent ! {self(), apply(fun emysql_conn:execute/3, Args)}
%Parent ! {self(), apply(fun emysql_conn:execute/3, Args)}
case Args of
{transaction, _MaybeOtherConnection, Fun} ->
#ok_packet{} = emysql_conn:execute(Connection, <<"START TRANSACTION;">>, []),
Result = Fun(fun(Query, Params) -> emysql_conn:execute(Connection, Query, Params) end),
#ok_packet{} = emysql_conn:execute(Connection, <<"COMMIT;">>, []),
Parent ! {self(), Result};
_ ->
Parent ! {self(), apply(fun emysql_conn:execute/3, Args)}
end
end),
receive
{'DOWN', Mref, process, Pid, tcp_connection_closed} ->
Expand Down
Loading