@@ -57,14 +57,10 @@ class SimpleQueryStateMachineTests: XCTestCase {
5757 let input : [ RowDescription . Column ] = [
5858 . init( name: " version " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . text, dataTypeSize: - 1 , dataTypeModifier: - 1 , format: . text)
5959 ]
60- let expected : [ RowDescription . Column ] = input. map {
61- . init( name: $0. name, tableOID: $0. tableOID, columnAttributeNumber: $0. columnAttributeNumber, dataType: $0. dataType,
62- dataTypeSize: $0. dataTypeSize, dataTypeModifier: $0. dataTypeModifier, format: . text)
63- }
6460
6561 XCTAssertEqual ( state. rowDescriptionReceived ( . init( columns: input) ) , . wait)
6662 let row1 : DataRow = [ ByteBuffer ( string: " test1 " ) ]
67- let result = QueryResult ( value: . rowDescription( expected ) , logger: queryContext. logger)
63+ let result = QueryResult ( value: . rowDescription( input ) , logger: queryContext. logger)
6864 XCTAssertEqual ( state. dataRowReceived ( row1) , . succeedQuery( promise, with: result) )
6965 XCTAssertEqual ( state. channelReadComplete ( ) , . forwardRows( [ row1] ) )
7066 XCTAssertEqual ( state. readEventCaught ( ) , . wait)
@@ -128,7 +124,7 @@ class SimpleQueryStateMachineTests: XCTestCase {
128124 . failQuery( promise, with: psqlError, cleanupContext: . init( action: . close, tasks: [ ] , error: psqlError, closePromise: nil ) ) )
129125 }
130126
131- func testQueryIsCancelledImmediatly ( ) {
127+ func testQueryIsCancelledImmediately ( ) {
132128 var state = ConnectionStateMachine . readyForQuery ( )
133129
134130 let logger = Logger . psqlTest
@@ -183,6 +179,46 @@ class SimpleQueryStateMachineTests: XCTestCase {
183179 XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
184180 }
185181
182+ func testQueryIsCancelledWithReadPendingWhileStreaming( ) {
183+ var state = ConnectionStateMachine . readyForQuery ( )
184+
185+ let logger = Logger . psqlTest
186+ let promise = EmbeddedEventLoop ( ) . makePromise ( of: PSQLRowStream . self)
187+ promise. fail ( PSQLError . uncleanShutdown) // we don't care about the error at all.
188+ let query = " SELECT version() "
189+ let queryContext = SimpleQueryContext ( query: query, logger: logger, promise: promise)
190+
191+ XCTAssertEqual ( state. enqueue ( task: . simpleQuery( queryContext) ) , . sendQuery( query) )
192+
193+ // We need to ensure that even though the row description from the wire says that we
194+ // will receive data in `.text` format, we will actually receive it in binary format,
195+ // since we requested it in binary with our bind message.
196+ let input : [ RowDescription . Column ] = [
197+ . init( name: " version " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . text, dataTypeSize: - 1 , dataTypeModifier: - 1 , format: . text)
198+ ]
199+
200+ XCTAssertEqual ( state. rowDescriptionReceived ( . init( columns: input) ) , . wait)
201+ let row1 : DataRow = [ ByteBuffer ( string: " test1 " ) ]
202+ let result = QueryResult ( value: . rowDescription( input) , logger: queryContext. logger)
203+ XCTAssertEqual ( state. dataRowReceived ( row1) , . succeedQuery( promise, with: result) )
204+ XCTAssertEqual ( state. cancelQueryStream ( ) , . forwardStreamError( . queryCancelled, read: false , cleanupContext: nil ) )
205+ XCTAssertEqual ( state. channelReadComplete ( ) , . wait)
206+ XCTAssertEqual ( state. readEventCaught ( ) , . read)
207+
208+ XCTAssertEqual ( state. dataRowReceived ( [ ByteBuffer ( string: " test2 " ) ] ) , . wait)
209+ XCTAssertEqual ( state. dataRowReceived ( [ ByteBuffer ( string: " test3 " ) ] ) , . wait)
210+ XCTAssertEqual ( state. dataRowReceived ( [ ByteBuffer ( string: " test4 " ) ] ) , . wait)
211+ XCTAssertEqual ( state. channelReadComplete ( ) , . wait)
212+ XCTAssertEqual ( state. readEventCaught ( ) , . read)
213+
214+ XCTAssertEqual ( state. channelReadComplete ( ) , . wait)
215+ XCTAssertEqual ( state. readEventCaught ( ) , . read)
216+
217+ XCTAssertEqual ( state. commandCompletedReceived ( " SELECT 2 " ) , . wait)
218+ XCTAssertEqual ( state. readyForQueryReceived ( . idle) , . fireEventReadyForQuery)
219+ }
220+
221+
186222 func testCancelQueryAfterServerError( ) {
187223 var state = ConnectionStateMachine . readyForQuery ( )
188224
@@ -200,13 +236,9 @@ class SimpleQueryStateMachineTests: XCTestCase {
200236 let input : [ RowDescription . Column ] = [
201237 . init( name: " version " , tableOID: 0 , columnAttributeNumber: 0 , dataType: . text, dataTypeSize: - 1 , dataTypeModifier: - 1 , format: . text)
202238 ]
203- let expected : [ RowDescription . Column ] = input. map {
204- . init( name: $0. name, tableOID: $0. tableOID, columnAttributeNumber: $0. columnAttributeNumber, dataType: $0. dataType,
205- dataTypeSize: $0. dataTypeSize, dataTypeModifier: $0. dataTypeModifier, format: . text)
206- }
207239
208240 XCTAssertEqual ( state. rowDescriptionReceived ( . init( columns: input) ) , . wait)
209- let result = QueryResult ( value: . rowDescription( expected ) , logger: queryContext. logger)
241+ let result = QueryResult ( value: . rowDescription( input ) , logger: queryContext. logger)
210242 let row1 : DataRow = [ ByteBuffer ( string: " test1 " ) ]
211243 XCTAssertEqual ( state. dataRowReceived ( row1) , . succeedQuery( promise, with: result) )
212244
0 commit comments