-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathexecute.jl
225 lines (193 loc) · 8.27 KB
/
execute.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
mutable struct TextCursor{buffered} <: DBInterface.Cursor
conn::Connection
sql::String
nfields::Int
nrows::Int
rows_affected::Int64
result::API.MYSQL_RES
names::Vector{Symbol}
types::Vector{Type}
lookup::Dict{Symbol, Int}
current_rownumber::Int
current_resultsetnumber::Int
mysql_date_and_time::Bool
end
struct TextRow{buffered} <: Tables.AbstractRow
cursor::TextCursor{buffered}
row::Ptr{Ptr{UInt8}}
lengths::Vector{Culong}
rownumber::Int
resultsetnumber::Int
end
getcursor(r::TextRow) = getfield(r, :cursor)
getrow(r::TextRow) = getfield(r, :row)
getlengths(r::TextRow) = getfield(r, :lengths)
getrownumber(r::TextRow) = getfield(r, :rownumber)
getresultsetnumber(r::TextRow) = getfield(r, :resultsetnumber)
Tables.columnnames(r::TextRow) = getcursor(r).names
cast(::Type{Union{Missing, T}}, ptr, len) where {T} = ptr == C_NULL ? missing : cast(T, ptr, len)
cast(::Type{API.Bit}, ptr, len) = API.Bit(len == 0 ? 0 : UInt64(unsafe_load(ptr)))
function cast(::Type{Vector{UInt8}}, ptr, len)
A = Vector{UInt8}(undef, len)
Base.unsafe_copyto!(pointer(A), ptr, len)
return A
end
function cast(::Type{String}, ptr, len)
str = Base._string_n(len)
Base.unsafe_copyto!(pointer(str), ptr, len)
return str
end
function cast(::Type{Dec64}, ptr, len)
str = cast(String, ptr, len)
return parse(Dec64, str)
end
@noinline casterror(T, ptr, len) = error("error parsing $T from \"$(unsafe_string(ptr, len))\"")
function cast(::Type{T}, ptr, len) where {T}
buf = unsafe_wrap(Array, ptr, len)
x, code, pos = Parsers.typeparser(T, buf, 1, len, buf[1], Int16(0), Parsers.OPTIONS)
if code > 0
return x
end
casterror(T, ptr, len)
end
const DATETIME_OPTIONS = Parsers.Options(dateformat=dateformat"yyyy-mm-dd HH:MM:SS.s")
const ZERO_DATE = Vector{UInt8}("0000-00-00 00:00:00")
function cast(::Type{DateTime}, ptr, len)
buf = unsafe_wrap(Array, ptr, len)
try
x, code, pos = Parsers.typeparser(DateTime, buf, 1, len, buf[1], Int16(0), DATETIME_OPTIONS)
if code > 0
return x
elseif buf == ZERO_DATE
return DateTime(0)
end
catch e
e isa InexactError && API.dateandtime_warning()
end
casterror(DateTime, ptr, len)
end
const DATEANDTIME_OPTIONS = Parsers.Options(dateformat=dateformat"yyyy-mm-dd HH:MM:SS")
function cast(::Type{DateAndTime}, ptr, len)
buf = unsafe_wrap(Array, ptr, len)
i = findfirst(==(UInt8('.')), buf)
x, code, pos = Parsers.typeparser(DateTime, buf, 1, something(i, len), buf[1], Int16(0), DATETIME_OPTIONS)
if code > 0
dt, tm = Date(x), Time(x)
if i !== nothing
y, code, pos = Parsers.typeparser(Int, buf, i + 1, len, buf[1], Int16(0), Parsers.OPTIONS)
tm += Dates.Microsecond(y)
end
return DateAndTime(dt, tm)
elseif buf == ZERO_DATE
return DateAndTime(Date(0), Time(0))
end
casterror(DateAndTime, ptr, len)
end
@noinline wrongrow(i) = throw(ArgumentError("row $i is no longer valid; mysql results are forward-only iterators where each row is only valid when iterated"))
function Tables.getcolumn(r::TextRow, ::Type{T}, i::Int, nm::Symbol) where {T}
(getrownumber(r) == getcursor(r).current_rownumber && getresultsetnumber(r) == getcursor(r).current_resultsetnumber) || wrongrow(getrownumber(r))
return cast(T, unsafe_load(getrow(r), i), getlengths(r)[i])
end
Tables.getcolumn(r::TextRow, i::Int) = Tables.getcolumn(r, getcursor(r).types[i], i, getcursor(r).names[i])
Tables.getcolumn(r::TextRow, nm::Symbol) = Tables.getcolumn(r, getcursor(r).lookup[nm])
Tables.isrowtable(::Type{<:TextCursor}) = true
Tables.schema(c::TextCursor) = Tables.Schema(c.names, c.types)
Base.eltype(c::TextCursor) = TextRow
Base.IteratorSize(::Type{TextCursor{true}}) = Base.HasLength()
Base.IteratorSize(::Type{TextCursor{false}}) = Base.SizeUnknown()
Base.length(c::TextCursor) = c.nrows
function Base.iterate(cursor::TextCursor{buffered}, i=1) where {buffered}
cursor.result.ptr == C_NULL && return nothing
rowptr = API.fetchrow(cursor.conn.mysql, cursor.result)
if rowptr == C_NULL
!buffered && API.errno(cursor.conn.mysql) != 0 && throw(API.Error(cursor.conn.mysql))
return nothing
end
lengths = API.fetchlengths(cursor.result, cursor.nfields)
cursor.current_rownumber = i
return TextRow(cursor, rowptr, lengths, i, cursor.current_resultsetnumber), i + 1
end
"""
DBInterface.lastrowid(c::MySQL.TextCursor)
Return the last inserted row id.
"""
function DBInterface.lastrowid(c::TextCursor)
checkconn(c.conn)
return API.insertid(c.conn.mysql)
end
"""
DBInterface.close!(cursor)
Close a cursor. No more results will be available.
"""
DBInterface.close!(c::TextCursor) = clear!(c.conn)
"""
DBInterface.execute(conn::MySQL.Connection, sql) => MySQL.TextCursor
Execute the SQL `sql` statement with the database connection `conn`. Parameter binding is
only supported via prepared statements, see `?DBInterface.prepare(conn, sql)`.
Returns a `Cursor` object, which iterates resultset rows and satisfies the `Tables.jl` interface, meaning
results can be sent to any valid sink function (`DataFrame(cursor)`, `CSV.write("results.csv", cursor)`, etc.).
Specifying `mysql_store_result=false` will avoid buffering the full resultset to the client after executing
the query, which has memory use advantages, though ties up the database server since resultset rows must be
fetched one at a time.
"""
function DBInterface.execute(conn::Connection, sql::AbstractString, params=(); mysql_store_result::Bool=true, mysql_date_and_time::Bool=false)
checkconn(conn)
params != () && error("`DBInterface.execute(conn, sql)` does not support parameter binding; see `?DBInterface.prepare(conn, sql)`")
clear!(conn)
API.query(conn.mysql, sql)
buffered = false
nrows = -1
rows_affected = UInt64(0)
nfields = 0
if mysql_store_result
buffered = true
result = API.storeresult(conn.mysql)
else
result = API.useresult(conn.mysql)
end
conn.lastexecute = result
if result.ptr != C_NULL
if buffered
nrows = API.numrows(result)
end
nfields = API.numfields(result)
fields = API.fetchfields(result, nfields)
names = [ccall(:jl_symbol_n, Ref{Symbol}, (Cstring, Csize_t), x.name, x.name_length) for x in fields]
types = [juliatype(x.field_type, API.notnullable(x), API.isunsigned(x), API.isbinary(x), mysql_date_and_time) for x in fields]
elseif API.fieldcount(conn.mysql) == 0
rows_affected = API.affectedrows(conn.mysql)
names = Symbol[]
types = Type[]
else
error("error with mysql resultset columns")
end
lookup = Dict(x => i for (i, x) in enumerate(names))
return TextCursor{buffered}(conn, sql, nfields, nrows, Core.bitcast(Int64, rows_affected), result, names, types, lookup, 0, 1, mysql_date_and_time)
end
struct TextCursors{T}
cursor::TextCursor{T}
end
Base.eltype(c::TextCursors{T}) where {T} = TextCursor{T}
Base.IteratorSize(::Type{<:TextCursors}) = Base.SizeUnknown()
function Base.iterate(cursor::TextCursors{buffered}, first=true) where {buffered}
cursor.cursor.result.ptr == C_NULL && return nothing
if !first
finalize(cursor.cursor.result)
if API.moreresults(cursor.cursor.conn.mysql)
@assert API.nextresult(cursor.cursor.conn.mysql) !== nothing
cursor.cursor.result = buffered ? API.storeresult(cursor.cursor.conn.mysql) : API.useresult(cursor.cursor.conn.mysql)
if buffered
cursor.cursor.nrows = API.numrows(cursor.cursor.result)
end
cursor.cursor.nfields = API.numfields(cursor.cursor.result)
fields = API.fetchfields(cursor.cursor.result, cursor.cursor.nfields)
cursor.cursor.names = [ccall(:jl_symbol_n, Ref{Symbol}, (Cstring, Csize_t), x.name, x.name_length) for x in fields]
cursor.cursor.types = [juliatype(x.field_type, API.notnullable(x), API.isunsigned(x), API.isbinary(x), cursor.cursor.mysql_date_and_time) for x in fields]
else
return nothing
end
end
return cursor.cursor, false
end
DBInterface.executemultiple(conn::Connection, sql::AbstractString, params=(); kw...) =
TextCursors(DBInterface.execute(conn, sql, params; kw...))