Skip to content

err-ctrl-alt-del/kafka-clients-util

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Java Utilities for PySpark Kafka Clients (Kerberos)

Getting Started

git clone https://github.com/gerrard-err/kafka-clients-util.git

Prerequisites

Maven 3, Java 8

Installing

mvn install

Running the tests

mvn test

Deployment

export PYSPARK_SUBMIT_ARGS='--master yarn-client
--jars .ivy2/jars/kafka-clients-util-1.0.jar
--driver-class-path .ivy2/jars/kafka-clients-util-1.0.jar
 pyspark-shell'

Example

from pyspark import SparkContext, SparkConf
from py4j.java_gateway import Py4JNetworkError

log4jLogger = sc._jvm.org.apache.log4j 
log = log4jLogger.LogManager.getLogger(__name__) 

def create_kafka_params():
    sc._jvm.java.lang.System.setProperty("java.security.auth.login.config",
                                         "/usr/hdp/current/kafka-broker/conf/kafka_client_jaas.conf")
    kafkaParams = sc._gateway.jvm.java.util.HashMap()
    kafkaParams.put("bootstrap.servers", "localhost:6667");
    kafkaParams.put("security.protocol", "SASL_PLAINTEXT")
    kafkaParams.put("acks", "all");
    kafkaParams.put("retries", 1);
    kafkaParams.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    kafkaParams.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");
    return kafkaParams

def send(message):
    try:
        log.info("Creating Kafka producer...")
        kafkaClientsHelperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
                .loadClass("com.github.kafka.clients.KafkaClientsPythonHelper")
        kafkaClientsHelper = kafkaClientsHelperClass.newInstance()
        kafkaClientsHelper.createProducer(create_kafka_params())
        log.info("Sending message...")
        kafkaClientsHelper.send("topic_example", sc._gateway.jvm.java.util.UUID.randomUUID().toString(), message)
    except Py4JJavaError as e:
        if 'ClassNotFoundException' in str(e.java_exception):
            log.error(e.java_exception)
        raise e
    except Exception:
        print("Exception")
    finally:
        log.info("Shutting down Kafka producer...")
        kafkaClientsHelper.shutdownProducer()        

Authors

See also the list of contributors who participated in this project.

License

This project is licensed under the MIT License - see the LICENSE file for details

About

Java Utilities for PySpark Kafka Clients

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages