Upsun User Documentation

Kafka (Message queue service)

Try Upsun for 15 days
After that, enjoy the same, game-changing Upsun features for less with the First Project Incentive!ยน A monthly $19 perk!
ยนTerms and conditions apply
Activate your 15-day trial

Apache Kafka is an open-source stream-processing software platform.

It is a framework for storing, reading and analyzing streaming data. See the Kafka documentation for more information.

Supported versions Anchor to this heading

You can select the major and minor version.

Patch versions are applied periodically for bug fixes and the like. When you deploy your app, you always get the latest available patches.

  • 3.7
  • 3.6
  • 3.4
  • 3.2

Relationship reference Anchor to this heading

For each service defined via a relationship to your application, Upsun automatically generates corresponding environment variables within your application container, in the $<RELATIONSHIP-NAME>_<SERVICE-PROPERTY> format.

Here is example information available through the service environment variables themselves, or through the PLATFORM_RELATIONSHIPS environment variable.

You can obtain the complete list of available service environment variables in your app container by running upsun ssh env.

Note that the information about the relationship can change when an app is redeployed or restarted or the relationship is changed. So your apps should only rely on the service environment variables directly rather than hard coding any values.

KAFKA_SERVICE=kafka
KAFKA_IP=123.456.78.90
KAFKA_HOSTNAME=azertyuiopqsdfghjklm.kafka.service._.eu-1.platformsh.site
KAFKA_CLUSTER=azertyuiop-main-7rqtwti
KAFKA_HOST=kafka.internal
KAFKA_REL=kafka
KAFKA_SCHEME=kafka
KAFKA_TYPE=kafka:3.7
KAFKA_PORT=9092

For some advanced use cases, you can use the PLATFORM_RELATIONSHIPS environment variable. The structure of the PLATFORM_RELATIONSHIPS environment variable can be obtained by running upsun relationships in your terminal:

{
    "service": "kafka",
    "ip": "123.456.78.90",
    "hostname": "azertyuiopqsdfghjklm.kafka.service._.eu-1.platformsh.site",
    "cluster": "azertyuiop-main-7rqtwti",
    "host": "kafka.internal",
    "rel": "kafka",
    "scheme": "kafka",
    "type": "kafka:3.7",
    "port": 9092
}

Here is an example of how to gather PLATFORM_RELATIONSHIPS environment variable information in a .environment file:

.environment
# Decode the built-in credentials object variable.
export RELATIONSHIPS_JSON=$(echo $PLATFORM_RELATIONSHIPS | base64 --decode)

# Set environment variables for individual credentials.
export APP_KAFKA_HOST="$(echo $RELATIONSHIPS_JSON | jq -r '.kafka[0].host')"

Usage example Anchor to this heading

1. Configure the service Anchor to this heading

To define the service, use the kafka type:

.upsun/config.yaml
services:
    # The name of the service container. Must be unique within a project.
    <SERVICE_NAME>:
        type: kafka:<VERSION>

Note that changing the name of the service replaces it with a brand new service and all existing data is lost. Back up your data before changing the service.

2. Add the relationship Anchor to this heading

To define the relationship, use the following configuration:

.upsun/config.yaml
applications:
    # The name of the app container. Must be unique within a project.
    <APP_NAME>:
        # Relationships enable access from this app to a given service.
        # The example below shows simplified configuration leveraging a default service
        # (identified from the relationship name) and a default endpoint.
        # See the Application reference for all options for defining relationships and endpoints.
        relationships:
            <SERVICE_NAME>: 
services:
    # The name of the service container. Must be unique within a project.
    <SERVICE_NAME>:
        type: kafka:<VERSION>

You can define <SERVICE_NAME> as you like, so long as it’s unique between all defined services and matches in both the application and services configuration.

The example above leverages default endpoint configuration for relationships. That is, it uses default endpoints behind-the-scenes, providing a relationship (the network address a service is accessible from) that is identical to the name of that service.

Depending on your needs, instead of default endpoint configuration, you can use explicit endpoint configuration.

With the above definition, the application container (<APP_NAME>) now has access to the service via the relationship <RELATIONSHIP_NAME> and its corresponding service environment variables

Example configuration Anchor to this heading

.upsun/config.yaml
applications:
    # The name of the app container. Must be unique within a project.
    myapp:
        # Relationships enable access from this app to a given service.
        # The example below shows simplified configuration leveraging a default service
        # (identified from the relationship name) and a default endpoint.
        # See the Application reference for all options for defining relationships and endpoints.
        relationships:
            kafka: 

services:
    # The name of the service container. Must be unique within a project.
    kafka:
        type: kafka:3.7

Use in app Anchor to this heading

To use the configured service in your app, add a configuration file similar to the following to your project.

from json import dumps
from json import loads
from kafka import KafkaConsumer, KafkaProducer
from platformshconfig import Config


def usage_example():
    # Create a new Config object to ease reading the Platform.sh environment variables.
    # You can alternatively use os.environ yourself.
    config = Config()
    # Get the credentials to connect to the Kafka service.
    credentials = config.credentials('kafka')
    
    try:
        kafka_server = '{}:{}'.format(credentials['host'], credentials['port'])
        
        # Producer
        producer = KafkaProducer(
            bootstrap_servers=[kafka_server],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        for e in range(10):
            data = {'number' : e}
            producer.send('numtest', value=data)
        
        # Consumer
        consumer = KafkaConsumer(
            bootstrap_servers=[kafka_server],
            auto_offset_reset='earliest'
        )
        
        consumer.subscribe(['numtest'])
        
        output = ''
        # For demonstration purposes so it doesn't block.
        for e in range(10):
            message = next(consumer)
            output += str(loads(message.value.decode('UTF-8'))["number"]) + ', '

        # What a real implementation would do instead.
        # for message in consumer:
        #     output += loads(message.value.decode('UTF-8'))["number"]

        return output
    
    except Exception as e:
        return e

## With the ruby-kafka gem

# Producer
require "kafka"
kafka = Kafka.new(["kafka.internal:9092"], client_id: "my-application")
kafka.deliver_message("Hello, World!", topic: "greetings")

# Consumer
kafka.each_message(topic: "greetings") do |message|
  puts message.offset, message.key, message.value
end

Is this page helpful?