Setup

The usual method to add a python kafka library

pip install kafka-python

Sending and Receiving messages

A simple/demo producer

from kafka import KafkaConsumer, KafkaProducer
import threading
import json 
 
class Producer(threading.Thread):
  daemon = True
 
  def run(self):
    producer = KafkaProducer(bootstrap_servers='victoria.com:6667',
                             value_serializer=lambda v:
                             json.dumps(v).encode('utf-8')) 
 
 
    # we can then send json data
      while True:
        producer.send('my-topic', {"dataObjectID": "test1"})
        producer.send('my-topic', {"dataObjectID": "test2"})
        time.sleep(1) 
 

And a similarly simple consumer

class Consumer(threading.Thread):
    daemon = True
    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='victoria.com:6667',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m.decode('utf-8')))
        consumer.subscribe(['my-topic'])
        for message in consumer:
            print (message)    
 
python_kafka.txt · Last modified: 2019/07/24 00:30 by root
 
RSS - 200 © CrosswireDigitialMedia Ltd