Skip to content

Streaming queries #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(streaming): incorporate into mysql.query as kwarg, update do…
…cs, tests
  • Loading branch information
mcmcgrath13 committed Feb 28, 2019
commit 6d446d917e4c09efffadc120f95ba92ed9aa880f
28 changes: 8 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,9 @@ MySQL.escape(conn::MySQL.Connection, str::String) -> String
```
Escape an SQL statement

#### MySQL.Query (previously MySQL.query)
#### MySQL.query (deprecated)

```julia
MySQL.Query(conn::MySQL.Connection, sql::String; append::Bool=false) => sink
```
Execute an SQL statement and return the results as a MySQL.Query object (see [MySQL.Query](#mysqlquery)).

The results can be materialized as a data sink that implements the Tables.jl interface.
E.g. `MySQL.Query(conn, sql) |> DataFrame` or `MySQL.Query(conn, sql) |> columntable`
Deprecated - see [MySQL.Query](#mysqlquery)

#### MySQL.execute!

Expand Down Expand Up @@ -178,20 +172,14 @@ Alternately, a source implementing the Tables.jl interface can be streamed by ex
MySQL.Query(conn, sql, kwargs...) => MySQL.Query
```

Execute an SQL statement and return a `MySQL.Query` object. Implements the `Tables.jl`
interface. Result rows can be iterated via `Table.rows`.
Execute an SQL statement and return a `MySQL.Query` object. Result rows can be
iterated as NamedTuples via `Table.rows(query)` where `query` is the `MySQL.Query`
object.

#### MySQL.StreamingQuery

```julia
MySQL.StreamingQuery(conn, sql, kwargs...) => MySQL.StreamingQuery
```
Supported Key Word Arguments:
* `streaming` - Defaults to false. If true, length of the result size is unknown as the result is returned row by row. May be more memory efficient.

Execute an SQL statement and return a `MySQL.StreamingQuery` object. This is a lower
level query interface, which does not implement the `Tables.jl` interface. The
`MySQL.StreamingQuery` result must be iterated to process the result. Performing the query
in this way is less memory intensive on MySQL, but the number of returned rows is
unknown until iteration is complete.
To materialize the results as a `DataFrame`, use `MySQL.query(conn, sql) |> DataFrame`.

### Example

Expand Down
23 changes: 9 additions & 14 deletions src/api.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,6 @@ macro c(func, ret, args, vals...)
end
end

# function mysql_library_init(argc=0, argv=C_NULL, groups=C_NULL)
# return ccall((:mysql_library_init, libmariadb),
# Cint,
# (Cint, Ptr{Ptr{UInt8}}, Ptr{Ptr{UInt8}}),
# argc, argv, groups)
# end

# function mysql_library_end()
# return ccall((:mysql_library_end, libmariadb),
# Cvoid,
# (),
# )
# end

"""
Initializes the MYSQL object. Must be called before mysql_real_connect.
Memory allocated by mysql_init can be freed with mysql_close.
Expand Down Expand Up @@ -428,6 +414,9 @@ function mysql_stmt_bind_result(stmtptr, bind::Ptr{MYSQL_BIND})
bind)
end

"""
Submit a query to the server
"""
function mysql_query(mysqlptr::Ptr{Cvoid}, sql::String)
return @c(:mysql_query,
Cint,
Expand All @@ -436,13 +425,19 @@ function mysql_query(mysqlptr::Ptr{Cvoid}, sql::String)
sql)
end

"""
After mysql_query or mysql_real_query used to store result in memory and send all to client
"""
function mysql_store_result(mysqlptr::Ptr{Cvoid})
return @c(:mysql_store_result,
MYSQL_RES,
(Ptr{Cvoid}, ),
mysqlptr)
end

"""
After mysql_query or mysql_real_query used to stream result and send to client row by row
"""
function mysql_use_result(mysqlptr::Ptr{Cvoid})
return @c(:mysql_use_result,
MYSQL_RES,
Expand Down
67 changes: 20 additions & 47 deletions src/types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ function metadata(result::API.MYSQL_RES)
return unsafe_wrap(Array, rawfields, nfields)
end

# implements Table interface
mutable struct Query{hasresult, names, T}
# resulttype is a symbol with values :none :streaming :default
# names and types relate to the returned columns in the query
mutable struct Query{resulttype, names, T}
result::Result
ptr::Ptr{Ptr{Int8}}
ncols::Int
Expand All @@ -76,64 +77,45 @@ Execute an SQL statement and return a `MySQL.Query` object. Result rows can be
iterated as NamedTuples via `Table.rows(query)` where `query` is the `MySQL.Query`
object.

Supported Key Word Arguments:
* `streaming` - Defaults to false. If true, length of the result size is unknown as the result is returned row by row. May be more memory efficient.

To materialize the results as a `DataFrame`, use `MySQL.query(conn, sql) |> DataFrame`.
"""
function Query(conn::Connection, sql::String; kwargs...)
conn.ptr == C_NULL && throw(MySQLInterfaceError("Method called with null connection."))
MySQL.API.mysql_query(conn.ptr, sql) != 0 && throw(MySQLInternalError(conn))
result = MySQL.Result(MySQL.API.mysql_store_result(conn.ptr))
if result.ptr != C_NULL
nrows = MySQL.API.mysql_num_rows(result.ptr)
fields = MySQL.metadata(result.ptr)
names = Tuple(ccall(:jl_symbol_n, Ref{Symbol}, (Ptr{UInt8}, Csize_t), x.name, x.name_length) for x in fields)
T = Tuple{(julia_type(x.field_type, API.notnullable(x), API.isunsigned(x)) for x in fields)...}
hasresult = true
ncols = length(fields)
ptr = MySQL.API.mysql_fetch_row(result.ptr)
elseif API.mysql_field_count(conn.ptr) == 0
result = Result(Int(API.mysql_affected_rows(conn.ptr)))
nrows = ncols = 1
names = (:num_rows_affected,)
T = Tuple{Int}
hasresult = false
ptr = C_NULL

if get(kwargs, :streaming, false)
resulttype = :streaming
result = MySQL.Result(MySQL.API.mysql_use_result(conn.ptr))
else
throw(MySQLInterfaceError("Query expected to produce results but did not."))
resulttype = :default
result = result = MySQL.Result(MySQL.API.mysql_store_result(conn.ptr))
end
return Query{hasresult, names, T}(result, ptr, ncols, nrows)
end


"""
MySQL.StreamingQuery(conn, sql; kwargs...) => MySQL.Query

Execute an SQL statement and return a `MySQL.StreamingQuery` object. Object must
be iterated to return result set. This is lower level functionality to support
streaming processing of large datasets.
"""
function StreamingQuery(conn::Connection, sql::String; kwargs...)
conn.ptr == C_NULL && throw(MySQLInterfaceError("Method called with null connection."))
MySQL.API.mysql_query(conn.ptr, sql) != 0 && throw(MySQLInternalError(conn))
result = MySQL.Result(MySQL.API.mysql_use_result(conn.ptr))
if result.ptr != C_NULL
nrows = MySQL.API.mysql_num_rows(result.ptr)
fields = MySQL.metadata(result.ptr)
names = Tuple(ccall(:jl_symbol_n, Ref{Symbol}, (Ptr{UInt8}, Csize_t), x.name, x.name_length) for x in fields)
T = Tuple{(julia_type(x.field_type, API.notnullable(x), API.isunsigned(x)) for x in fields)...}
hasresult = true
ncols = length(fields)
ptr = MySQL.API.mysql_fetch_row(result.ptr)
elseif API.mysql_field_count(conn.ptr) == 0
result = Result(Int(API.mysql_affected_rows(conn.ptr)))
nrows = ncols = 1
names = (:num_rows_affected,)
T = Tuple{Int}
hasresult = false
resulttype = :none
ptr = C_NULL
else
throw(MySQLInterfaceError("Query expected to produce results but did not."))
end
return StreamingQuery{hasresult, names, T}(result, ptr, ncols)
return Query{resulttype, names, T}(result, ptr, ncols, nrows)
end

Base.IteratorSize(::Type{Query{resulttype, names, T}}) where {resulttype, names, T} = resulttype == :streaming ? Base.SizeUnknown() : Base.HasLength()

Tables.istable(::Type{<:Query}) = true
Tables.rowaccess(::Type{<:Query}) = true
Tables.rows(q::Query) = q
Expand Down Expand Up @@ -165,19 +147,10 @@ function generate_namedtuple(::Type{NamedTuple{names, types}}, q) where {names,
end
end

function Base.iterate(q::Query{hasresult, names, types}, st=1) where {hasresult, names, types}
st > length(q) && return nothing
!hasresult && return (num_rows_affected=Int(q.result.ptr),), 2
function Base.iterate(q::Query{resulttype, names, types}, st=1) where {resulttype, names, types}
st == 1 && resulttype == :none && return (num_rows_affected=Int(q.result.ptr),), 2
q.ptr == C_NULL && return nothing
nt = generate_namedtuple(NamedTuple{names, types}, q)
q.ptr = API.mysql_fetch_row(q.result.ptr)
return nt, st + 1
end

function Base.iterate(q::StreamingQuery{hasresult, names, types}, st=1) where {hasresult, names, types}
q.ptr == C_NULL && return nothing
!hasresult && return (num_rows_affected=Int(q.result.ptr),), 2
nt = generate_namedtuple(NamedTuple{names, types}, q)
q.ptr = API.mysql_fetch_row(q.result.ptr)
return nt, st
end
4 changes: 3 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ expected = (
@test res == expected

# Streaming Queries
sres = MySQL.StreamingQuery(conn, "select * from Employee")
sres = MySQL.Query(conn, "select * from Employee", streaming=true)

@test sres.nrows == 0

data = []
for row in sres
Expand Down