diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5156403 --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +Copyright 2017 Roy Myers + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md index f0e2eb0..569eb77 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,80 @@ -Simple kafka/flask server. -Tutorial to come. +# Simple kafka/flask server. +Tutorial to come and be linked. +# Project Title + +A couple examples of a python app that makes use of Kafka in a docker container, Kafka-python python library, and Flask as a webserver. The intent is to have several branches to cover multiple different scenarios utilizing Kafka. + * [kafkaProducerService.py](kafkaProducerService.py) + * Currently a multi-threaded python script that shoots out messages to a kafka consumer. + * Loops for 17 seconds. + * Thread sends 2 messages to the kafka topic 'test' every three seconds. + * only spawns one thread that is a producer + * //TODO is change it to our specific use case for the tutorial. + * [server.py](server.py) + * A flask server + * accepts a post + * submits the post body to a kafka topic 'test' as JSON. + +## Getting Started + +These instructions will get you a copy of the project up and running on your local machine for development and testing purposes. See deployment for notes on how to deploy the project on a live system `//TODO`. + +### Prerequisites + +What things you need to install the software and how to install them. + +These apps require a couple docker containers to run properly. The scripts should be all you need aside from Docker installed. +Only one of the containers is used for the functionality. The other running container is just used to repeat kafka messages and would be replaced in any kind of deployment. For those reasons I didn't create a docker-compose config. One is just a constant output feed. +``` +scripts/createTopicOnExistingKafka.sh - Runs command on the running Kafka container to create a topic. +scripts/listTopicOnExistingKafka.sh - Checks to make sure the topic was created. It runs a command on an existing Docker container +scripts/listenToKafkaTopic.sh - Spans a temporary docker container to consume from the Kafka topic and it prints the message on the screen +scripts/startMasterZookeeperContainer.sh - Creates a docker network. Starts a container in the background that runs zookeeper and Kafka in the same container. +``` + +Order to run: + * `startMasterZookeeperContainer.sh` > Output is the container ID + * `createTopicOnExistingKafka.sh` > Output is something along the lines of Topic created + * `listTopicOnExistingKafka.sh` > Output is something along the lines of current topics : + * `listenToKafkaTopic.sh` > Output is nothing at first then it is the Kafka messages as they get consumed. Container exits when you Ctrl^C + +You will end with a persistent container running Kafka and Zookeeper in the background, a container printing out to the terminal relaying messages to a Kafka topic. + +### Installing + +A step by step series of examples that tell you have to get a development env running. With python Always suggest a virtual environment. + + +Install the python packages + +``` +pip install -r requirements.txt +``` + + +Then you can run the apps. + +``` +python server.py +``` +or +``` +python kafkaProducerService.py +``` + + + +## Authors + +* **Roy Myers** - *Initial work* - [rmyers19](https://github.optum.com/rmyers19) + + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details ---- -## Links +## Links and Thanks! [kafka-python](http://kafka-python.readthedocs.io/en/master/) [Flask](http://flask.pocoo.org/) [Example kafka-python program/server](https://github.com/dpkp/kafka-python/blob/master/example.py) diff --git a/kafkaConsumerService.py b/kafkaConsumerService.py new file mode 100644 index 0000000..a0580eb --- /dev/null +++ b/kafkaConsumerService.py @@ -0,0 +1,96 @@ +import threading, logging, time +import multiprocessing +import os + +from kafka import KafkaConsumer + + +#Thread class that continuously consumes messages from a Kafka message queue/topic +# Input: +# threading.Thread = instance of multiproccessing thread +class Consumer(multiprocessing.Process): + #default initializing function of the thread. + # Input: + # takes self to modify and initialize + def __init__(self): + #initializes thread and passes self. Magic multithreading stuff + multiprocessing.Process.__init__(self) + #Gives the thread an envent called stop_event so it can be interrupted. + self.stop_event = multiprocessing.Event() + + #Function to stop the process + def stop(self): + #Calls even stop_event and sets it. + #This gives context to the thread from the outside and lets you stop it. + self.stop_event.set() + + #The main run function called when you call start. + def run(self): + if hasattr(os, 'getppid'): # only available on Unix + print 'parent process:', os.getppid() + procID = os.getppid() + #Bootstraps an instance of a Kafka producer. + #Initializes the producer and identifies the docker server. + #kafka-spotify is listed in /etc/hosts with the ip of the container + #Input: + # topic to subscribe to: 'test' + # Id to identify the consumer should be unique to the connection + # Servers kafka is advertising as + # Which message rule to subscribe to. 'earliest' will grab the earliest unprocessed message + # Timeout limit + consumer = KafkaConsumer('test', + client_id='python-consumer-%s' % (procID), + bootstrap_servers=['kafka-spotify:9092'], + auto_offset_reset='latest', + consumer_timeout_ms=1000) + + #Alternative way to subscribe to a topic + #consumer.subscribe(['test']) + + #loop until the thread is stopped by checking the stop event + while not self.stop_event.is_set(): + #Loop through ConsumerRecord objects in the consumer object + for message in consumer: + #print the messages to the screen with a note of the thread/client ID + #print("python-consumer-%s processed message: %s" % (procID, message)) + #print the messages to the screen with a note of the thread/client ID, Current Topic, message number, # The value of the message decoded as it is sent as bytecode + print ("python-consumer-%s processed message: %s:%d: value=%s" % (procID, message.topic, + message.offset, message.value.decode('utf-8'))) + #break out of the for loop if the thread was notified of closure + if self.stop_event.is_set(): + break + + #Close the TCP connection to kafka + consumer.close() + + +#Main function called when the app is run +def main(): + #initialize a Consumer object/thread + kafkConsumer = Consumer() + + #Start the thread working. + kafkConsumer.start() + + #sleep for 17 second. If we weren't using threads this would halt the code + time.sleep(20) + + #Call stop to set the thread event so it knows to stop + print("Stopping kafkConsumer") + kafkConsumer.stop() + + #Wait until the thread terminates. Can see the docs for more + #https://docs.python.org/2/library/threading.html?highlight=thread#threading.Thread.join + print("Waiting for execution to halt") + kafkConsumer.join() + +#the logic to run as process +if __name__ == "__main__": + #Set logging format and level + #logging.basicConfig( + # format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + # level=logging.INFO) + + #Call the main function + main() + diff --git a/kafkaProducerService.py b/kafkaProducerService.py index 5d10fa7..50d2f26 100644 --- a/kafkaProducerService.py +++ b/kafkaProducerService.py @@ -1,5 +1,4 @@ import threading, logging, time -import multiprocessing from kafka import KafkaProducer @@ -34,7 +33,7 @@ def run(self): while not self.stop_event.is_set(): #Send two messages of type binary to the 'test' Topic producer.send('test', b"test") - producer.send('test', b"\xc2Hola, mundo!") + producer.send('test', b"Hola, mundo!") #Sleep for 3 seconds time.sleep(3)