forked from vapor/postgres-nio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPostgresConnection+Database.swift
99 lines (83 loc) · 4.09 KB
/
PostgresConnection+Database.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import Logging
import struct Foundation.Data
extension PostgresConnection: PostgresDatabase {
public func send(
_ request: PostgresRequest,
logger: Logger
) -> EventLoopFuture<Void> {
guard let command = request as? PostgresCommands else {
preconditionFailure("\(#function) requires an instance of PostgresCommands. This will be a compile-time error in the future.")
}
let resultFuture: EventLoopFuture<Void>
switch command {
case .query(let query, let binds, let onMetadata, let onRow):
resultFuture = self.underlying.query(query, binds, logger: logger).flatMap { rows in
let fields = rows.rowDescription.map { column in
PostgresMessage.RowDescription.Field(
name: column.name,
tableOID: UInt32(column.tableOID),
columnAttributeNumber: column.columnAttributeNumber,
dataType: PostgresDataType(UInt32(column.dataType.rawValue)),
dataTypeSize: column.dataTypeSize,
dataTypeModifier: column.dataTypeModifier,
formatCode: .init(psqlFormatCode: column.formatCode)
)
}
let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
return rows.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow).map { _ in
onMetadata(PostgresQueryMetadata(string: rows.commandTag)!)
}
}
case .prepareQuery(let request):
resultFuture = self.underlying.prepareStatement(request.query, with: request.name, logger: self.logger).map {
request.prepared = PreparedQuery(underlying: $0, database: self)
}
case .executePreparedStatement(let preparedQuery, let binds, let onRow):
resultFuture = self.underlying.execute(preparedQuery.underlying, binds, logger: logger).flatMap { rows in
guard let lookupTable = preparedQuery.lookupTable else {
return self.eventLoop.makeSucceededFuture(())
}
return rows.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow)
}
}
return resultFuture.flatMapErrorThrowing { error in
throw error.asAppropriatePostgresError
}
}
public func withConnection<T>(_ closure: (PostgresConnection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
closure(self)
}
}
internal enum PostgresCommands: PostgresRequest {
case query(query: String,
binds: [PostgresData],
onMetadata: (PostgresQueryMetadata) -> () = { _ in },
onRow: (PostgresRow) throws -> ())
case prepareQuery(request: PrepareQueryRequest)
case executePreparedStatement(query: PreparedQuery, binds: [PostgresData], onRow: (PostgresRow) throws -> ())
func respond(to message: PostgresMessage) throws -> [PostgresMessage]? {
fatalError("This function must not be called")
}
func start() throws -> [PostgresMessage] {
fatalError("This function must not be called")
}
func log(to logger: Logger) {
fatalError("This function must not be called")
}
}
extension PSQLRows {
func iterateRowsWithoutBackpressureOption(lookupTable: PostgresRow.LookupTable, onRow: @escaping (PostgresRow) throws -> ()) -> EventLoopFuture<Void> {
self.onRow { psqlRow in
let columns = psqlRow.data.map { psqlData in
PostgresMessage.DataRow.Column(value: psqlData.bytes)
}
let row = PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
do {
try onRow(row)
return self.eventLoop.makeSucceededFuture(Void())
} catch {
return self.eventLoop.makeFailedFuture(error)
}
}
}
}