forked from vapor/postgres-nio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPostgresConnection+Database.swift
124 lines (106 loc) · 5.48 KB
/
PostgresConnection+Database.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import NIOCore
import Logging
import struct Foundation.Data
extension PostgresConnection: PostgresDatabase {
public func send(
_ request: PostgresRequest,
logger: Logger
) -> EventLoopFuture<Void> {
guard let command = request as? PostgresCommands else {
preconditionFailure("\(#function) requires an instance of PostgresCommands. This will be a compile-time error in the future.")
}
let resultFuture: EventLoopFuture<Void>
switch command {
case .query(let query, let binds, let onMetadata, let onRow):
resultFuture = self.underlying.query(query, binds, logger: logger).flatMap { stream in
let fields = stream.rowDescription.map { column in
PostgresMessage.RowDescription.Field(
name: column.name,
tableOID: UInt32(column.tableOID),
columnAttributeNumber: column.columnAttributeNumber,
dataType: PostgresDataType(UInt32(column.dataType.rawValue)),
dataTypeSize: column.dataTypeSize,
dataTypeModifier: column.dataTypeModifier,
formatCode: .init(psqlFormatCode: column.format)
)
}
let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
return stream.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow).map { _ in
onMetadata(PostgresQueryMetadata(string: stream.commandTag)!)
}
}
case .queryAll(let query, let binds, let onResult):
resultFuture = self.underlying.query(query, binds, logger: logger).flatMap { rows in
let fields = rows.rowDescription.map { column in
PostgresMessage.RowDescription.Field(
name: column.name,
tableOID: UInt32(column.tableOID),
columnAttributeNumber: column.columnAttributeNumber,
dataType: PostgresDataType(UInt32(column.dataType.rawValue)),
dataTypeSize: column.dataTypeSize,
dataTypeModifier: column.dataTypeModifier,
formatCode: .init(psqlFormatCode: column.format)
)
}
let lookupTable = PostgresRow.LookupTable(rowDescription: .init(fields: fields), resultFormat: [.binary])
return rows.all().map { allrows in
let r = allrows.map { psqlRow -> PostgresRow in
let columns = psqlRow.data.map {
PostgresMessage.DataRow.Column(value: $0)
}
return PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
}
onResult(.init(metadata: PostgresQueryMetadata(string: rows.commandTag)!, rows: r))
}
}
case .prepareQuery(let request):
resultFuture = self.underlying.prepareStatement(request.query, with: request.name, logger: self.logger).map {
request.prepared = PreparedQuery(underlying: $0, database: self)
}
case .executePreparedStatement(let preparedQuery, let binds, let onRow):
resultFuture = self.underlying.execute(preparedQuery.underlying, binds, logger: logger).flatMap { rows in
guard let lookupTable = preparedQuery.lookupTable else {
return self.eventLoop.makeSucceededFuture(())
}
return rows.iterateRowsWithoutBackpressureOption(lookupTable: lookupTable, onRow: onRow)
}
}
return resultFuture.flatMapErrorThrowing { error in
throw error.asAppropriatePostgresError
}
}
public func withConnection<T>(_ closure: (PostgresConnection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
closure(self)
}
}
internal enum PostgresCommands: PostgresRequest {
case query(query: String,
binds: [PostgresData],
onMetadata: (PostgresQueryMetadata) -> () = { _ in },
onRow: (PostgresRow) throws -> ())
case queryAll(query: String,
binds: [PostgresData],
onResult: (PostgresQueryResult) -> ())
case prepareQuery(request: PrepareQueryRequest)
case executePreparedStatement(query: PreparedQuery, binds: [PostgresData], onRow: (PostgresRow) throws -> ())
func respond(to message: PostgresMessage) throws -> [PostgresMessage]? {
fatalError("This function must not be called")
}
func start() throws -> [PostgresMessage] {
fatalError("This function must not be called")
}
func log(to logger: Logger) {
fatalError("This function must not be called")
}
}
extension PSQLRowStream {
func iterateRowsWithoutBackpressureOption(lookupTable: PostgresRow.LookupTable, onRow: @escaping (PostgresRow) throws -> ()) -> EventLoopFuture<Void> {
self.onRow { psqlRow in
let columns = psqlRow.data.map {
PostgresMessage.DataRow.Column(value: $0)
}
let row = PostgresRow(dataRow: .init(columns: columns), lookupTable: lookupTable)
try onRow(row)
}
}
}