Skip to content

Commit 3474089

Browse files
committed
data + message refactor
1 parent 2c311c1 commit 3474089

File tree

68 files changed

+1029
-1172
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1029
-1172
lines changed

Sources/PostgreSQL/PostgreSQLDataEncoder.swift renamed to Sources/PostgreSQL/Codable/PostgreSQLDataEncoder.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public struct PostgreSQLDataEncoder {
2222
let context = _PostgreSQLDataEncoderContext()
2323
try encodable.encode(to: _PostgreSQLDataEncoder(context))
2424
guard let data = context.data else {
25-
throw PostgreSQLError(identifier: "dataEncode", reason: "Could not convert to `PostgreSQLData`: \(encodable)", source: .capture())
25+
throw PostgreSQLError(identifier: "dataEncode", reason: "Could not convert to `PostgreSQLData`: \(encodable)")
2626
}
2727
return data
2828
}
@@ -87,7 +87,7 @@ private struct _PostgreSQLDataSingleValueEncoder: SingleValueEncodingContainer {
8787

8888
/// See `SingleValueEncodingContainer`.
8989
mutating func encodeNil() throws {
90-
context.data = PostgreSQLData(type: .void, format: .binary, data: nil)
90+
context.data = PostgreSQLData(null: .void)
9191
}
9292

9393
/// See `SingleValueEncodingContainer`.

Sources/PostgreSQL/PostgreSQLRowDecoder.swift renamed to Sources/PostgreSQL/Codable/PostgreSQLRowDecoder.swift

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private final class _PostgreSQLRowDecoder: Decoder {
5656

5757
func require(key: CodingKey) throws -> PostgreSQLData {
5858
guard let data = get(key: key) else {
59-
throw PostgreSQLError(identifier: "decode", reason: "No value found at key: \(key)", source: .capture())
59+
throw PostgreSQLError(identifier: "decode", reason: "No value found at key: \(key)")
6060
}
6161
return data
6262
}
@@ -110,8 +110,7 @@ private struct PostgreSQLRowKeyedDecodingContainer<K>: KeyedDecodingContainerPro
110110
reason: "Unsupported decodable type: \(type)",
111111
suggestedFixes: [
112112
"Conform \(type) to PostgreSQLDataCustomConvertible"
113-
],
114-
source: .capture()
113+
]
115114
)
116115
}
117116
return try convertible.convertFromPostgreSQLData(decoder.require(key: key)) as! T
@@ -135,7 +134,6 @@ private func unsupported() -> PostgreSQLError {
135134
reason: "PostgreSQL rows only support a flat, keyed structure `[String: T]`",
136135
suggestedFixes: [
137136
"You can conform nested types to `PostgreSQLJSONType` or `PostgreSQLArrayType`. (Nested types must be `PostgreSQLDataCustomConvertible`.)"
138-
],
139-
source: .capture()
137+
]
140138
)
141139
}

Sources/PostgreSQL/PostgreSQLRowEncoder.swift renamed to Sources/PostgreSQL/Codable/PostgreSQLRowEncoder.swift

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private struct _PostgreSQLRowKeyedEncodingContainer<K>: KeyedEncodingContainerPr
6060
self.encoder.data[col] = try value.convertToPostgreSQLData()
6161
}
6262

63-
mutating func encodeNil(forKey key: K) throws { try set(key, to: PostgreSQLData(type: .void, data: nil)) }
63+
mutating func encodeNil(forKey key: K) throws { try set(key, to: PostgreSQLData(null: .void)) }
6464
mutating func encode(_ value: Bool, forKey key: K) throws { try set(key, to: value) }
6565
mutating func encode(_ value: Int, forKey key: K) throws { try set(key, to: value) }
6666
mutating func encode(_ value: Int16, forKey key: K) throws { try set(key, to: value) }
@@ -96,7 +96,7 @@ private struct _PostgreSQLRowKeyedEncodingContainer<K>: KeyedEncodingContainerPr
9696
try encode(value, forKey: key)
9797
} else {
9898
if let convertibleType = T.self as? PostgreSQLDataConvertible.Type {
99-
try set(key, to: PostgreSQLData(type: convertibleType.postgreSQLDataType, data: nil))
99+
try set(key, to: PostgreSQLData(null: convertibleType.postgreSQLDataType))
100100
} else {
101101
try encodeNil(forKey: key)
102102
}
@@ -111,8 +111,7 @@ private struct _PostgreSQLRowKeyedEncodingContainer<K>: KeyedEncodingContainerPr
111111
reason: "Unsupported encodable type: \(type)",
112112
suggestedFixes: [
113113
"Conform \(type) to PostgreSQLDataCustomConvertible"
114-
],
115-
source: .capture()
114+
]
116115
)
117116
}
118117
try set(key, to: convertible)

Sources/PostgreSQL/Data/PostgreSQLColumn.swift renamed to Sources/PostgreSQL/Column/PostgreSQLColumn.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
public struct PostgreSQLColumn: Hashable, Equatable {
33
/// The table this column belongs to.
44
public var tableOID: UInt32
5-
5+
66
/// The column's name.
77
public var name: String
88
}
@@ -23,7 +23,7 @@ extension Dictionary where Key == PostgreSQLColumn {
2323
}
2424
return nil
2525
}
26-
26+
2727
/// Access a `Value` from this dictionary keyed by `PostgreSQLColumn`s
2828
/// using a field (column) name and entity (table) name.
2929
public func value(forTableOID tableOID: UInt32, atColumn column: String) -> Value? {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
extension Query where Database == PostgreSQLDatabase {
2+
public struct ColumnType {
3+
/// `BOOL`.
4+
public static var bool: ColumnType {
5+
return .init(name: "BOOL")
6+
}
7+
8+
/// `CHAR(n)`
9+
///
10+
/// - parameters:
11+
/// - length: Maximum characters to allow.
12+
public static func char(_ length: Int) -> ColumnType {
13+
return .init(name: "CHAR", parameters: [length.description])
14+
}
15+
16+
/// `VARCHAR(n)`
17+
///
18+
/// - parameters:
19+
/// - length: Maximum characters to allow.
20+
public static func varchar(_ length: Int) -> ColumnType {
21+
return .init(name: "VARCHAR", parameters: [length.description])
22+
}
23+
24+
/// `TEXT`
25+
public static var text: ColumnType {
26+
return .init(name: "TEXT")
27+
}
28+
29+
/// `SMALLINT`
30+
public static var smallint: ColumnType {
31+
return .init(name: "SMALLINT")
32+
}
33+
34+
/// `INT`
35+
public static var int: ColumnType {
36+
return .init(name: "INT")
37+
}
38+
39+
/// `BIGINT`
40+
public static var bigint: ColumnType {
41+
return .init(name: "BIGINT")
42+
}
43+
44+
/// `SMALLSERIAL`
45+
public static var smallserial: ColumnType {
46+
return .init(name: "SMALL SERIAL")
47+
}
48+
49+
/// `SERIAL`
50+
public static var serial: ColumnType {
51+
return .init(name: "SERIAL")
52+
}
53+
54+
/// `BIGSERIAL`
55+
public static var bigserial: ColumnType {
56+
return .init(name: "BIGSERIAL")
57+
}
58+
59+
/// `REAL`
60+
public static var real: ColumnType {
61+
return .init(name: "REAL")
62+
}
63+
64+
/// `DOUBLE PRECISION`
65+
public static var double: ColumnType {
66+
return .init(name: "DOUBLE PRECISION")
67+
}
68+
69+
/// `DATE`
70+
public static var date: ColumnType {
71+
return .init(name: "DATE")
72+
}
73+
74+
/// `TIMESTAMP`
75+
public static var timestamp: ColumnType {
76+
return .init(name: "TIMESTAMP")
77+
}
78+
79+
public enum Default {
80+
case computed(Query.DML.ComputedColumn)
81+
case unescaped(String)
82+
}
83+
84+
public var name: String
85+
public var parameters: [String]
86+
public var primaryKey: Bool
87+
public var nullable: Bool
88+
public var generatedIdentity: Bool
89+
public var `default`: Default?
90+
91+
public init(name: String, parameters: [String] = [], primaryKey: Bool = false, nullable: Bool = false, generatedIdentity: Bool = false, default: Default? = nil) {
92+
self.name = name
93+
self.parameters = parameters
94+
self.primaryKey = primaryKey
95+
self.nullable = nullable
96+
self.generatedIdentity = generatedIdentity
97+
self.default = `default`
98+
}
99+
100+
public init(_ dataType: PostgreSQLDataType, parameters: [String] = [], primaryKey: Bool = false, nullable: Bool = false, generatedIdentity: Bool = false, default: Default? = nil) {
101+
self.name = dataType.knownSQLName ?? "VOID"
102+
self.parameters = parameters
103+
self.primaryKey = primaryKey
104+
self.nullable = nullable
105+
self.generatedIdentity = generatedIdentity
106+
self.default = `default`
107+
}
108+
}
109+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
extension ChannelPipeline {
2+
/// Adds PostgreSQL message encoder and decoder to the channel pipeline.
3+
///
4+
/// - parameters:
5+
/// - first: If `true`, adds the handlers first. Defaults to `false`.
6+
/// - returns: A future signaling completion.
7+
public func addPostgreSQLClientHandlers(first: Bool = false) -> Future<Void> {
8+
return addHandlers([PostgreSQLMessageEncoder(), PostgreSQLMessageDecoder()], first: first)
9+
}
10+
}
Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,20 @@
1-
import Async
2-
import NIO
3-
import NIOOpenSSL
4-
51
extension PostgreSQLConnection {
6-
@available(*, deprecated, message: "Use `.connect(to:...)` instead.")
72
public static func connect(
83
hostname: String = "localhost",
94
port: Int = 5432,
10-
transport: PostgreSQLTransportConfig = .cleartext,
5+
transport: TransportConfig = .cleartext,
116
on worker: Worker,
127
onError: @escaping (Error) -> ()
13-
) throws -> Future<PostgreSQLConnection> {
8+
) throws -> Future<PostgreSQLConnection> {
149
return try connect(to: .tcp(hostname: hostname, port: port), transport: transport, on: worker, onError: onError)
1510
}
1611

17-
/// Connects to a PostgreSQL server using a TCP socket.
1812
public static func connect(
19-
to serverAddress: PostgreSQLDatabaseConfig.ServerAddress = .default,
20-
transport: PostgreSQLTransportConfig = .cleartext,
13+
to serverAddress: ServerAddress,
14+
transport: TransportConfig = .cleartext,
2115
on worker: Worker,
2216
onError: @escaping (Error) -> ()
23-
) throws -> Future<PostgreSQLConnection> {
17+
) throws -> Future<PostgreSQLConnection> {
2418
let handler = QueueHandler<PostgreSQLMessage, PostgreSQLMessage>(on: worker, onError: onError)
2519
let bootstrap = ClientBootstrap(group: worker.eventLoop)
2620
// Enable SO_REUSEADDR.
@@ -30,39 +24,25 @@ extension PostgreSQLConnection {
3024
channel.pipeline.add(handler: handler)
3125
}
3226
}
33-
34-
let connectedBootstrap: Future<Channel>
35-
switch serverAddress {
36-
case let .tcp(hostname, port):
37-
connectedBootstrap = bootstrap.connect(host: hostname, port: port)
38-
case let .unixSocket(socketPath):
39-
connectedBootstrap = bootstrap.connect(unixDomainSocketPath: socketPath)
40-
}
41-
42-
return connectedBootstrap.flatMap { channel in
27+
return bootstrap.connect(to: serverAddress).flatMap { channel in
4328
let connection = PostgreSQLConnection(queue: handler, channel: channel)
44-
if case .tls(let tlsConfiguration) = transport.method {
45-
return connection.addSSLClientHandler(using: tlsConfiguration).transform(to: connection)
46-
} else {
47-
return worker.eventLoop.newSucceededFuture(result: connection)
29+
switch transport.storage {
30+
case .cleartext:
31+
return worker.future(connection)
32+
case .tls(let tlsConfig):
33+
return connection.addSSLClientHandler(using: tlsConfig).transform(to: connection)
4834
}
4935
}
5036
}
5137
}
5238

53-
extension ChannelPipeline {
54-
func addPostgreSQLClientHandlers(first: Bool = false) -> EventLoopFuture<Void> {
55-
return addHandlers(PostgreSQLMessageEncoder(), PostgreSQLMessageDecoder(), first: first)
56-
}
57-
58-
/// Adds the provided channel handlers to the pipeline in the order given, taking account
59-
/// of the behaviour of `ChannelHandler.add(first:)`.
60-
private func addHandlers(_ handlers: ChannelHandler..., first: Bool) -> EventLoopFuture<Void> {
61-
var handlers = handlers
62-
if first {
63-
handlers = handlers.reversed()
39+
private extension ClientBootstrap {
40+
/// PostgreSQL specific address connect.
41+
func connect(to serverAddress: PostgreSQLConnection.ServerAddress) -> Future<Channel> {
42+
switch serverAddress.storage {
43+
case .socketAddress(let socketAddress): return connect(to: socketAddress)
44+
case .tcp(let hostname, let port): return connect(host: hostname, port: port)
45+
case .unixSocket(let path): return connect(unixDomainSocketPath: path)
6446
}
65-
66-
return EventLoopFuture<Void>.andAll(handlers.map { add(handler: $0) }, eventLoop: eventLoop)
6747
}
6848
}
Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,33 @@
1-
//extension PostgreSQLConnection {
2-
// /// Note: after calling `listen'` on a connection, it can no longer handle other database operations. Do not try to send other SQL commands through this connection afterwards.
3-
// /// IAlso, notifications will only be sent for as long as this connection remains open; you are responsible for opening a new connection to listen on when this one closes.
4-
// internal func listen(_ channelName: String, handler: @escaping (String) throws -> ()) throws -> Future<Void> {
5-
// closeHandlers.append({ conn in
6-
// let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";")
7-
// return conn.send([.query(query)], onResponse: { _ in })
8-
// })
9-
//
10-
// notificationHandlers[channelName] = { message in
11-
// try handler(message)
12-
// }
13-
// let query = PostgreSQLQuery(query: "LISTEN \"\(channelName)\";")
14-
// return queue.enqueue([.query(query)], onInput: { message in
15-
// switch message {
16-
// case let .notificationResponse(notification):
17-
// try self.notificationHandlers[notification.channel]?(notification.message)
18-
// default:
19-
// break
20-
// }
21-
// return false
22-
// })
23-
// }
24-
//
25-
// internal func notify(_ channelName: String, message: String) throws -> Future<Void> {
26-
// let query = PostgreSQLQuery(query: "NOTIFY \"\(channelName)\", '\(message)';")
27-
// return send([.query(query)]).map(to: Void.self, { _ in })
28-
// }
29-
//
30-
// internal func unlisten(_ channelName: String, unlistenHandler: (() -> Void)? = nil) throws -> Future<Void> {
31-
// notificationHandlers.removeValue(forKey: channelName)
32-
// let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";")
33-
// return send([.query(query)], onResponse: { _ in unlistenHandler?() })
34-
// }
35-
//}
1+
extension PostgreSQLConnection {
2+
internal func listen(_ channelName: String, handler: @escaping (String) throws -> ()) throws -> Future<Void> {
3+
closeHandlers.append({ conn in
4+
let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";")
5+
return conn.send([.query(query)], onResponse: { _ in })
6+
})
7+
8+
notificationHandlers[channelName] = { message in
9+
try handler(message)
10+
}
11+
let query = PostgreSQLQuery(query: "LISTEN \"\(channelName)\";")
12+
return queue.enqueue([.query(query)], onInput: { message in
13+
switch message {
14+
case let .notificationResponse(notification):
15+
try self.notificationHandlers[notification.channel]?(notification.message)
16+
default:
17+
break
18+
}
19+
return false
20+
})
21+
}
22+
23+
internal func notify(_ channelName: String, message: String) throws -> Future<Void> {
24+
let query = PostgreSQLQuery(query: "NOTIFY \"\(channelName)\", '\(message)';")
25+
return send([.query(query)]).map(to: Void.self, { _ in })
26+
}
27+
28+
internal func unlisten(_ channelName: String, unlistenHandler: (() -> Void)? = nil) throws -> Future<Void> {
29+
notificationHandlers.removeValue(forKey: channelName)
30+
let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";")
31+
return send([.query(query)], onResponse: { _ in unlistenHandler?() })
32+
}
33+
}

0 commit comments

Comments
 (0)