Skip to content

ketgo/nameko-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

42 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Nameko-Kafka

Build Status codecov.io MIT licensed

Kafka extension for Nameko microservice framework.

Introduction

This is a Nameko microservice framework extension to support Kafka entrypoint and dependency. The motivation behind the project is issue 569. Nameko-kafka provide a simple implementation of the entrypoint based on the approach by calumpeterwebb. It also includes a dependency provider for publishing Kafka messages from within a Nameko service.

Installation

The package is supports Python >= 3.5

$ pip install nameko-kafka

Usage

The extension can be used for both, a service dependency and entrypoint. Example usage for both cases are shown in the following sections.

Dependency

This is basically a python-kafka producer in the form of Nameko dependency. Nameko uses dependency injection to instantiate the producer. You just need to declare it in your service class as shown:

from nameko.rpc import rpc
from nameko_kafka import KafkaProducer


class MyService:
    """
        My microservice
    """
    name = "my-service"
    # Kafak dependency
    producer = KafkaProducer(bootstrap_servers='localhost:1234')
    
    @rpc
    def method(self):
        # Publish message using dependency
        self.producer.send("kafka-topic", value=b"my-message", key=b"my-key")

Here KafkaProducer accepts all options valid for python-kafka's KafkaProducer.

Entrypoint

You can use the nameko_kafka.consume decorator in your services to process Kafka messages:

from nameko_kafka import consume


class MyService:
    """
        My microservice 
    """
    name = "my-service"

    @consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234')
    def method(self, message):
        # Your message handler
        handle_message(message) 

The consume decorator accepts all the options valid for python-kafka's KafkaConsumer.

On top of the default python-kafka's autocommit feature, the entrypoint also comes with support for three different types of offset commit strategies: at least once, at most once and exactly once. The three strategies correspond to the different message delivery semantics achievable in Kafka. Examples for each are shown in the following subsections.

At Least Once

from nameko_kafka import consume, Semantic


class MyService:
    """
        My microservice 
    """
    name = "my-service"
    
    # At least once semantic consumer
    @consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_LEAST_ONCE)
    def method(self, message):
        # Your message handler
        handle_message(message) 

At Most Once

from nameko_kafka import consume, Semantic


class MyService:
    """
        My microservice 
    """
    name = "my-service"
    
    # At most once semantic consumer
    @consume("kafka-topic", group_id="my-group", bootstrap_servers='localhost:1234', semantic=Semantic.AT_MOST_ONCE)
    def method(self, message):
        # Your message handler
        handle_message(message) 

Exactly Once

The exactly once semantic requires a persistent storage to save message offsets. Such a persistent store can be implemented using the OffsetStorage interface provided by Nameko-kafka. There can be various backend implementations like RDBMS, NoSQL databases, etc. Support for some comes out of the box:

MongoDB Storage
from nameko_kafka import consume, Semantic
from nameko_kafka.storage import MongoStorage

from pymongo import MongoClient


class MyService:
    """
        My microservice 
    """
    name = "my-service"
    
    # At most once semantic consumer
    @consume(
        "kafka-topic", 
        group_id="my-group", 
        bootstrap_servers='localhost:1234', 
        semantic=Semantic.EXACTLY_ONCE,
        storage=MongoStorage(
            # MongoDB backend client
            client=MongoClient('localhost', 27017),
            # Database to use for storage
            db_name="database-name",
            # Collection to use for storage
            collection="collection-name"
        )       
    )
    def method(self, message):
        # Your message handler
        handle_message(message) 

Note: If the db_name and collection arguments are not specified, the default value of "nameko_kafka_offsets" and "offsets" will be used by the storage respectively.

SQL Storage

Part of v0.3.0

S3 Storage

Part of v0.4.0

Azure Block Storage

Part of v0.5.0

Create Custom Storage

You can create your own offset storage by implementing the OffsetStorage interface. It exposes the following methods:

from nameko_kafka.storage.base import OffsetStorage

class MyStorage(OffsetStorage):
    """
        My custom offset storage.
    """

    def setup(self):
        """
            Method for setup of the storage.
        """

    def stop(self):
        """
            Method to teardown the storage.
        """

    def read(self, topic, partition):
        """
            Read last stored offset from storage for 
            given topic and partition.

            :param topic: message topic
            :param partition: partition number of the topic
            :returns: last committed offset value
        """

    def write(self, offsets):
        """
            Write offsets to storage.

            :param offsets: mapping between topic-partition
                tuples and corresponding latest offset value, 
                e.g.
                {
                    ("topic-1", 0): 1,
                    ("topic-1", 1): 3,
                    ("topic-2", 1): 10,
                    ...
                }
        """

Configurations

The extension configurations can be set in a nameko config.yaml file, or by environment variables.

Config File

# Config for entrypoint
KAFKA_CONSUMER:
  bootstrap_servers: 'localhost:1234'
  retry_backoff_ms: 100
  ...

# Config for dependency
KAFKA_PRODUCER:
  bootstrap_servers: 'localhost:1234'
  retries: 3
  ...

Environment Variables

# Config for entrypoint
KAFKA_CONSUMER='{"bootstrap_servers": "localhost:1234", "retry_backoff_ms": 100}'

# Config for dependency
KAFKA_PRODUCER='{"bootstrap_servers": "localhost:1234", "retries": 3}'

Milestones

  • Kafka Entrypoint
  • Kafka Dependency
  • Commit strategies:
    • ALMOST_ONCE_DELIVERY
    • AT_LEAST_ONCE_DELIVERY
    • EXACTLY_ONCE_DELIVERY
  • Commit storage for EXACT_ONCE_DELIVERY strategy

Developers

For development a kafka broker is required. You can spawn one using the docker-compose.yml file in the tests folder:

$ cd tests
$ docker-compose up -d 

To install all package dependencies:

$ pip install -r .[dev]
or
$ make deps

Other useful commands:

$ pytest --cov=nameko_kafka tests/			# to get coverage report
or
$ make coverage

$ pylint nameko_kafka       # to check code quality with PyLint
or
$ make lint

Contributions

Issue reports and Pull requests are always welcomed. Thanks!