diff --git a/consumer.c b/consumer.c
index 19a5b5f..8c4a6ca 100644
--- a/consumer.c
+++ b/consumer.c
@@ -433,48 +433,6 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, close)
}
/* }}} */
-/* {{{ proto Metadata SimpleKafkaClient\Consumer::getMetadata(bool all_topics, int timeout_ms, SimpleKafkaClient\Topic only_topic = null)
- Request Metadata from broker */
-ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
-{
- zend_bool all_topics;
- zval *only_zrkt = NULL;
- zend_long timeout_ms;
- rd_kafka_resp_err_t err;
- kafka_object *intern;
- const rd_kafka_metadata_t *metadata;
- kafka_topic_object *only_orkt = NULL;
-
- ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 3)
- Z_PARAM_BOOL(all_topics)
- Z_PARAM_LONG(timeout_ms)
- Z_PARAM_OPTIONAL
- Z_PARAM_OBJECT_OF_CLASS(only_zrkt, ce_kafka_topic)
- ZEND_PARSE_PARAMETERS_END();
-
- intern = get_kafka_object(getThis());
- if (!intern) {
- return;
- }
-
- if (only_zrkt) {
- only_orkt = get_kafka_topic_object(only_zrkt);
- if (!only_orkt) {
- return;
- }
- }
-
- err = rd_kafka_metadata(intern->rk, all_topics, only_orkt ? only_orkt->rkt : NULL, &metadata, timeout_ms);
-
- if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
- zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
- return;
- }
-
- kafka_metadata_obj_init(return_value, metadata);
-}
-/* }}} */
-
/* {{{ proto SimpleKafkaClient\ConsumerTopic SimpleKafkaClient\Consumer::getTopicHandle(string $topic)
Returns a SimpleKafkaClient\ConsumerTopic object */
ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle)
@@ -587,79 +545,3 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
}
/* }}} */
-/* {{{ proto void SimpleKafkaClient\Consumer::offsetsForTimes(array $topicPartitions, int $timeout_ms)
- Look up the offsets for the given partitions by timestamp. */
-ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
-{
- HashTable *htopars = NULL;
- kafka_object *intern;
- rd_kafka_topic_partition_list_t *topicPartitions;
- zend_long timeout_ms;
- rd_kafka_resp_err_t err;
-
- ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2)
- Z_PARAM_ARRAY_HT(htopars)
- Z_PARAM_LONG(timeout_ms)
- ZEND_PARSE_PARAMETERS_END();
-
- intern = get_kafka_object(getThis());
- if (!intern) {
- return;
- }
-
- topicPartitions = array_arg_to_kafka_topic_partition_list(1, htopars);
- if (!topicPartitions) {
- return;
- }
-
- err = rd_kafka_offsets_for_times(intern->rk, topicPartitions, timeout_ms);
-
- if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
- rd_kafka_topic_partition_list_destroy(topicPartitions);
- zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
- return;
- }
- kafka_topic_partition_list_to_array(return_value, topicPartitions);
- rd_kafka_topic_partition_list_destroy(topicPartitions);
-}
-/* }}} */
-
-/* {{{ proto void SimpleKafkaClient\Consumer::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms)
- Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
-ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
-{
- kafka_object *intern;
- char *topic;
- size_t topic_length;
- long low, high;
- zend_long partition, timeout_ms;
- zval *lowResult, *highResult;
- rd_kafka_resp_err_t err;
-
- ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2)
- Z_PARAM_STRING(topic, topic_length)
- Z_PARAM_LONG(partition)
- Z_PARAM_ZVAL(lowResult)
- Z_PARAM_ZVAL(highResult)
- Z_PARAM_LONG(timeout_ms)
- ZEND_PARSE_PARAMETERS_END();
-
- ZVAL_DEREF(lowResult);
- ZVAL_DEREF(highResult);
-
- intern = get_kafka_object(getThis());
- if (!intern) {
- return;
- }
-
- err = rd_kafka_query_watermark_offsets(intern->rk, topic, partition, &low, &high, timeout_ms);
-
- if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
- zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
- return;
- }
-
- ZVAL_LONG(lowResult, low);
- ZVAL_LONG(highResult, high);
-}
-/* }}} */
diff --git a/consumer.stub.php b/consumer.stub.php
index 37e20f6..d7e56d1 100644
--- a/consumer.stub.php
+++ b/consumer.stub.php
@@ -28,15 +28,9 @@ public function commitAsync($messageOrOffsets): void {}
public function close(): void {}
- public function getMetadata(bool $allTopics, int $timeoutMs, ConsumerTopic $topic): Metadata {}
-
public function getTopicHandle(string $topic): ConsumerTopic {}
public function getCommittedOffsets(array $topics, int $timeoutMs): array {}
public function getOffsetPositions(array $topics): array {}
-
- public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {}
-
- public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {}
}
diff --git a/consumer_arginfo.h b/consumer_arginfo.h
index 0691ebe..c33e529 100644
--- a/consumer_arginfo.h
+++ b/consumer_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 091c6b60081bb08ec174ef87b9cc6d2b3fbba461 */
+ * Stub hash: 378cc029a3673afe02572e7e17fde17e47b2aefd */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer___construct, 0, 0, 1)
ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0)
@@ -33,12 +33,6 @@ ZEND_END_ARG_INFO()
#define arginfo_class_SimpleKafkaClient_Consumer_close arginfo_class_SimpleKafkaClient_Consumer_unsubscribe
-ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
- ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0)
- ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
- ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\ConsumerTopic, 0)
-ZEND_END_ARG_INFO()
-
ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getTopicHandle, 0, 1, SimpleKafkaClient\\ConsumerTopic, 0)
ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0)
ZEND_END_ARG_INFO()
@@ -52,19 +46,6 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer
ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 0)
ZEND_END_ARG_INFO()
-ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_offsetsForTimes, 0, 2, IS_ARRAY, 0)
- ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0)
- ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
-ZEND_END_ARG_INFO()
-
-ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
- ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0)
- ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0)
- ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0)
- ZEND_ARG_TYPE_INFO(1, high, IS_LONG, 0)
- ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
-ZEND_END_ARG_INFO()
-
ZEND_METHOD(SimpleKafkaClient_Consumer, __construct);
ZEND_METHOD(SimpleKafkaClient_Consumer, assign);
@@ -76,12 +57,9 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume);
ZEND_METHOD(SimpleKafkaClient_Consumer, commit);
ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync);
ZEND_METHOD(SimpleKafkaClient_Consumer, close);
-ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata);
ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle);
ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets);
ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions);
-ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes);
-ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets);
static const zend_function_entry class_SimpleKafkaClient_Consumer_methods[] = {
@@ -95,11 +73,8 @@ static const zend_function_entry class_SimpleKafkaClient_Consumer_methods[] = {
ZEND_ME(SimpleKafkaClient_Consumer, commit, arginfo_class_SimpleKafkaClient_Consumer_commit, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Consumer, commitAsync, arginfo_class_SimpleKafkaClient_Consumer_commitAsync, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Consumer, close, arginfo_class_SimpleKafkaClient_Consumer_close, ZEND_ACC_PUBLIC)
- ZEND_ME(SimpleKafkaClient_Consumer, getMetadata, arginfo_class_SimpleKafkaClient_Consumer_getMetadata, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Consumer, getTopicHandle, arginfo_class_SimpleKafkaClient_Consumer_getTopicHandle, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Consumer, getCommittedOffsets, arginfo_class_SimpleKafkaClient_Consumer_getCommittedOffsets, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Consumer, getOffsetPositions, arginfo_class_SimpleKafkaClient_Consumer_getOffsetPositions, ZEND_ACC_PUBLIC)
- ZEND_ME(SimpleKafkaClient_Consumer, offsetsForTimes, arginfo_class_SimpleKafkaClient_Consumer_offsetsForTimes, ZEND_ACC_PUBLIC)
- ZEND_ME(SimpleKafkaClient_Consumer, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Consumer_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
diff --git a/package.xml b/package.xml
index 1a784f8..43365dc 100644
--- a/package.xml
+++ b/package.xml
@@ -42,7 +42,7 @@
-
+
diff --git a/producer.c b/producer.c
index 1c012fb..29c5ba4 100644
--- a/producer.c
+++ b/producer.c
@@ -120,6 +120,26 @@ ZEND_METHOD(SimpleKafkaClient_Producer, flush)
}
/* }}} */
+/* {{{ proto int SimpleKafkaClient\Producer::poll(int $timeoutMs)
+ Polls the provided kafka handle for events */
+ZEND_METHOD(SimpleKafkaClient_Producer, poll)
+{
+ kafka_object *intern;
+ zend_long timeout_ms;
+
+ ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
+ Z_PARAM_LONG(timeout_ms)
+ ZEND_PARSE_PARAMETERS_END();
+
+ intern = get_kafka_object(getThis());
+ if (!intern) {
+ return;
+ }
+
+ RETURN_LONG(rd_kafka_poll(intern->rk, timeout_ms));
+}
+/* }}} */
+
/* {{{ proto int SimpleKafkaClient\Producer::purge(int $purge_flags)
Purge messages that are in queue or in flight */
ZEND_METHOD(SimpleKafkaClient_Producer, purge)
diff --git a/producer.stub.php b/producer.stub.php
index f193737..f81610e 100644
--- a/producer.stub.php
+++ b/producer.stub.php
@@ -18,6 +18,8 @@ public function abortTransaction(int $timeoutMs): void {}
public function flush(int $timeoutMs): int {}
+ public function poll(int $timeoutMs): int {}
+
public function purge(int $purgeFlags): int {}
public function getTopicHandle(string $topic): ProducerTopic {}
diff --git a/producer_arginfo.h b/producer_arginfo.h
index 28b0435..8c4e343 100644
--- a/producer_arginfo.h
+++ b/producer_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: ae03dd8127a9e4799e241bc490de200ff18a4178 */
+ * Stub hash: 30c864ad8163b67989b699e8e94c4fe6539a5386 */
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Producer___construct, 0, 0, 1)
ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0)
@@ -20,6 +20,8 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
ZEND_END_ARG_INFO()
+#define arginfo_class_SimpleKafkaClient_Producer_poll arginfo_class_SimpleKafkaClient_Producer_flush
+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer_purge, 0, 1, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(0, purgeFlags, IS_LONG, 0)
ZEND_END_ARG_INFO()
@@ -35,6 +37,7 @@ ZEND_METHOD(SimpleKafkaClient_Producer, beginTransaction);
ZEND_METHOD(SimpleKafkaClient_Producer, commitTransaction);
ZEND_METHOD(SimpleKafkaClient_Producer, abortTransaction);
ZEND_METHOD(SimpleKafkaClient_Producer, flush);
+ZEND_METHOD(SimpleKafkaClient_Producer, poll);
ZEND_METHOD(SimpleKafkaClient_Producer, purge);
ZEND_METHOD(SimpleKafkaClient_Producer, getTopicHandle);
@@ -46,6 +49,7 @@ static const zend_function_entry class_SimpleKafkaClient_Producer_methods[] = {
ZEND_ME(SimpleKafkaClient_Producer, commitTransaction, arginfo_class_SimpleKafkaClient_Producer_commitTransaction, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Producer, abortTransaction, arginfo_class_SimpleKafkaClient_Producer_abortTransaction, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Producer, flush, arginfo_class_SimpleKafkaClient_Producer_flush, ZEND_ACC_PUBLIC)
+ ZEND_ME(SimpleKafkaClient_Producer, poll, arginfo_class_SimpleKafkaClient_Producer_poll, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Producer, purge, arginfo_class_SimpleKafkaClient_Producer_purge, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Producer, getTopicHandle, arginfo_class_SimpleKafkaClient_Producer_getTopicHandle, ZEND_ACC_PUBLIC)
ZEND_FE_END
diff --git a/simple_kafka_client.c b/simple_kafka_client.c
index 5b77132..eaf7df2 100644
--- a/simple_kafka_client.c
+++ b/simple_kafka_client.c
@@ -45,7 +45,7 @@
#include "consumer_arginfo.h"
#include "functions_arginfo.h"
#include "producer_arginfo.h"
-#include "kafka_arginfo.h"
+#include "simple_kafka_client_arginfo.h"
enum {
RD_KAFKA_LOG_PRINT = 100
@@ -184,26 +184,6 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen)
}
/* }}} */
-/* {{{ proto int SimpleKafkaClient\Kafka::poll(int $timeoutMs)
- Polls the provided kafka handle for events */
-ZEND_METHOD(SimpleKafkaClient_Kafka, poll)
-{
- kafka_object *intern;
- zend_long timeout_ms;
-
- ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
- Z_PARAM_LONG(timeout_ms)
- ZEND_PARSE_PARAMETERS_END();
-
- intern = get_kafka_object(getThis());
- if (!intern) {
- return;
- }
-
- RETURN_LONG(rd_kafka_poll(intern->rk, timeout_ms));
-}
-/* }}} */
-
/* {{{ proto void SimpleKafkaClient\Kafka::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms)
Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets)
@@ -216,7 +196,7 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets)
zval *lowResult, *highResult;
rd_kafka_resp_err_t err;
- ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2)
+ ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 5, 5)
Z_PARAM_STRING(topic, topic_length)
Z_PARAM_LONG(partition)
Z_PARAM_ZVAL(lowResult)
@@ -352,7 +332,7 @@ PHP_MINIT_FUNCTION(simple_kafka_client)
ce_kafka_producer = zend_register_internal_class_ex(&ce, ce_kafka);
INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods);
- ce_kafka_consumer = zend_register_internal_class(&ce);
+ ce_kafka_consumer = zend_register_internal_class_ex(&ce, ce_kafka);
ce_kafka_consumer->create_object = kafka_new;
kafka_conf_init(INIT_FUNC_ARGS_PASSTHRU);
diff --git a/kafka.stub.php b/simple_kafka_client.stub.php
similarity index 89%
rename from kafka.stub.php
rename to simple_kafka_client.stub.php
index d0fbe3f..de742ab 100644
--- a/kafka.stub.php
+++ b/simple_kafka_client.stub.php
@@ -10,8 +10,6 @@ public function getMetadata(bool $allTopics, int $timeoutMs, Topic $topic): Meta
public function getOutQLen(): int {}
- public function poll(int $timeoutMs): int {}
-
public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {}
public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {}
diff --git a/kafka_arginfo.h b/simple_kafka_client_arginfo.h
similarity index 83%
rename from kafka_arginfo.h
rename to simple_kafka_client_arginfo.h
index 0b7cd60..cc393b6 100644
--- a/kafka_arginfo.h
+++ b/simple_kafka_client_arginfo.h
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
- * Stub hash: 5620609ea29ca05a20736ac8412bee6e4cc39615 */
+ * Stub hash: e61ec0821ea47152b2ce6b7116ec791c0c712a73 */
ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0)
@@ -10,10 +10,6 @@ ZEND_END_ARG_INFO()
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0)
ZEND_END_ARG_INFO()
-ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_poll, 0, 1, IS_LONG, 0)
- ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
-ZEND_END_ARG_INFO()
-
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0)
@@ -30,7 +26,6 @@ ZEND_END_ARG_INFO()
ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata);
ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen);
-ZEND_METHOD(SimpleKafkaClient_Kafka, poll);
ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets);
ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes);
@@ -38,7 +33,6 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes);
static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC)
- ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_Kafka_poll, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_FE_END
diff --git a/tests/offsets_for_times.phpt b/tests/offsets_for_times.phpt
new file mode 100644
index 0000000..43e30e9
--- /dev/null
+++ b/tests/offsets_for_times.phpt
@@ -0,0 +1,36 @@
+--TEST--
+Produce, consume
+--SKIPIF--
+set('client.id', 'pure-php-producer');
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+
+$producer = new SimpleKafkaClient\Producer($conf);
+$topic = $producer->getTopicHandle('pure-php-test-topic-offsets');
+$time = time();
+$topic->producev(
+ RD_KAFKA_PARTITION_UA,
+ RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full
+ 'special-message',
+ 'special-key',
+ [
+ 'special-header' => 'awesome'
+ ]
+);
+$result = $producer->flush(20000);
+
+$topicPartition = new SimpleKafkaClient\TopicPartition('pure-php-test-topic-offsets', 0, $time);
+$result = $producer->offsetsForTimes([$topicPartition], 10000);
+var_dump($result[0]->getTopicName());
+var_dump($result[0]->getPartition());
+var_dump($result[0]->getOffset());
+--EXPECT--
+string(27) "pure-php-test-topic-offsets"
+int(0)
+int(0)
diff --git a/tests/query_watermark_offsets.phpt b/tests/query_watermark_offsets.phpt
new file mode 100644
index 0000000..2cba0bb
--- /dev/null
+++ b/tests/query_watermark_offsets.phpt
@@ -0,0 +1,33 @@
+--TEST--
+Produce, consume
+--SKIPIF--
+set('client.id', 'pure-php-producer');
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+
+$producer = new SimpleKafkaClient\Producer($conf);
+$topic = $producer->getTopicHandle('pure-php-test-topic-watermark');
+$topic->producev(
+ RD_KAFKA_PARTITION_UA,
+ RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full
+ 'special-message',
+ 'special-key',
+ [
+ 'special-header' => 'awesome'
+ ]
+);
+$result = $producer->flush(20000);
+$high = 0;
+$low = 0;
+$result = $producer->queryWatermarkOffsets('pure-php-test-topic-watermark', 0,$low, $high, 10000);
+var_dump($low);
+var_dump($high);
+--EXPECT--
+int(0)
+int(1)