Crux is becoming XTDB!

You can read the rename announcement, or head to this page on the new XTDB site (where you'll be taken in 10 seconds).

Menu

Kafka Connect

Introduction

A Kafka Connect plugin for transferring data between Crux nodes and Kafka.

The Crux source connector will publish transacations on a node to a Kafka topic, and the sink connector can receive transactions from a Kafka topic and submit them to a node.

Table 1. Currently supported data formats
Data format Sink/Source

JSON

Both

Avro

Sink

Transit

Source

EDN

Both

To get started with the connector, there are two separate guides (depending on whether you are using a full Confluent Platform installation, or a basic Kafka installation):

Confluent Platform Quickstart

Installing the connector

Use confluent-hub install juxt/kafka-connect-crux:21.05-1.17.0-beta to download and install the connector from Confluent hub. The downloaded connector is then placed within your confluent install’s 'share/confluent-hub-components' folder.

The connector can be used as either a source or a sink. In either case, there should be an associated Crux node to communicate with.

Creating the Crux node

To use our connector, you must first have a Crux node connected to Kafka. To do this, we start by adding the following dependencies to a project:

juxt/crux-core {:mvn/version "21.05-1.17.0-beta"}
juxt/crux-kafka {:mvn/version "21.05-1.17.0-beta"}
juxt/crux-http-server {:mvn/version "21.05-1.17.0-alpha"}
juxt/crux-rocksdb {:mvn/version "21.05-1.17.0-beta"}

Ensure first that you have a running Kafka broker to connect to. We import the dependencies into a file or REPL, then create our Kafka connected 'node' with an associated http server for the connector to communicate with:

(require '[crux.api :as crux]
         '[crux.http-server :as srv])
(import (crux.api ICruxAPI))

(def ^crux.api.ICruxAPI node
  (crux/start-node {:crux.node/topology '[crux.kafka/topology crux.http-server/module]
                    :crux.kafka/bootstrap-servers "localhost:9092"
                    :crux.http-server/port 3000}))

Sink Connector

Run the following command within the base of the Confluent folder, to create a worker which connects to the 'connect-test' topic, ready to send messages to the node. This also makes use of connect-file-source, checking for changes in a file called 'test.txt':

./bin/connect-standalone etc/kafka/connect-standalone.properties share/confluent-hub-components/juxt-kafka-connect-crux/etc/local-crux-sink.properties etc/kafka/connect-file-source.properties

Run the following within your Confluent directory, to add a line of JSON to 'test.txt':

echo '{"crux.db/id": "415c45c9-7cbe-4660-801b-dab9edc60c84", "value": "baz"}' >> test.txt

Now, verify that this was transacted within your REPL:

(crux/entity (crux/db node) "415c45c9-7cbe-4660-801b-dab9edc60c84")
==>
{:crux.db/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c84", :value "baz"}

Source Connector

Run the following command within the base of the Confluent folder, to create a worker connects to the 'connect-test' topic, ready to receive messages from the node. This also makes use of 'connect-file-sink', outputting transactions to your node within 'test.sink.txt':

./bin/connect-standalone etc/kafka/connect-standalone.properties share/confluent-hub-components/juxt-kafka-connect-crux/etc/local-crux-source.properties etc/kafka/connect-file-sink.properties

Within your REPL, transact an element into Crux:

(crux/submit-tx node [[:crux.tx/put {:crux.db/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"}]])

Check the contents of 'test.sink.txt' using the command below, and you should see that the transactions were outputted to the 'connect-test' topic:

tail test.sink.txt
==>
[[:crux.tx/put {:crux.db/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"} #inst "2019-09-19T12:31:21.342-00:00"]]

Kafka Quickstart

Installing the connector

Download the connector from Confluent hub, then unzip the downloaded folder:

unzip juxt-kafka-connect-crux-21.05-1.17.0-beta.zip

Navigate into the base of the Kafka folder, then run the following commands:

cp $CONNECTOR_PATH/lib/*-standalone.jar $KAFKA_HOME/libs
cp $CONNECTOR_PATH/etc/*.properties $KAFKA_HOME/config

The connector can be used as either a source or a sink. In either case, there should be an associated Crux node to communicate with.

Creating the Crux node

To use our connector, you must first have a Crux node connected to Kafka. To do this, we start by adding the following dependencies to a project:

juxt/crux-core {:mvn/version "21.05-1.17.0-beta"}
juxt/crux-kafka {:mvn/version "21.05-1.17.0-beta"}
juxt/crux-http-server {:mvn/version "21.05-1.17.0-alpha"}
juxt/crux-rocksdb {:mvn/version "21.05-1.17.0-beta"}

Ensure first that you have a running Kafka broker to connect to. We import the dependencies into a file or REPL, then create our Kafka connected 'node' with an associated http server for the connector to communicate with:

(require '[crux.api :as crux]
         '[crux.http-server :as srv])
(import (crux.api ICruxAPI))

(def ^crux.api.ICruxAPI node
  (crux/start-node {:crux.node/topology '[crux.kafka/topology crux.http-server/module]
                    :crux.kafka/bootstrap-servers "localhost:9092"
                    :crux.http-server/port 3000}))

Sink Connector

Run the following command within the base of the Kafka folder, to create a worker which connects to the 'connect-test' topic, ready to send messages to the node. This also makes use of connect-file-source, checking for changes in a file called 'test.txt':

./bin/connect-standalone.sh config/connect-standalone.properties config/local-crux-sink.properties config/connect-file-source.properties

Run the following within your Kafka directory, to add a line of JSON to 'test.txt':

echo '{"crux.db/id": "415c45c9-7cbe-4660-801b-dab9edc60c84", "value": "baz"}' >> test.txt

Now, verify that this was transacted within your REPL:

(crux/entity (crux/db node) "415c45c9-7cbe-4660-801b-dab9edc60c84")
==>
{:crux.db/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c84", :value "baz"}

Source Connector

Run the following command within the base of the Kafka folder, to create a worker connects to the 'connect-test' topic, ready to receive messages from the node. This also makes use of 'connect-file-sink', outputting transactions to your node within 'test.sink.txt':

./bin/connect-standalone.sh config/connect-standalone.properties config/local-crux-source.properties config/connect-file-sink.properties

Within your REPL, transact an element into Crux:

(crux/submit-tx node [[:crux.tx/put {:crux.db/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"}]])

Check the contents of 'test.sink.txt' using the command below, and you should see that the transactions were outputted to the 'connect-test' topic:

tail test.sink.txt
==>
[[:crux.tx/put {:crux.db/id #uuid "415c45c9-7cbe-4660-801b-dab9edc60c82", :value "baz-source"} #inst "2019-09-19T12:31:21.342-00:00"]]

Source Configuration

url
  • Destination URL of Crux HTTP end point

  • Type: String

  • Importance: High

  • Default: "http://localhost:3000"

topic
  • The Kafka topic to publish data to

  • Type: String

  • Importance: High

  • Default: "connect-test"

format
  • Format to send data out as: edn, json or transit

  • Type: String

  • Importance: Low

  • Default: "edn"

mode
  • Mode to use: tx or doc

  • Type: String

  • Importance: Low

  • Default: "tx"

batch.size
  • The maximum number of records the Source task can read from Crux at one time.

  • Type: Int

  • Importance: LOW

  • Default: 2000

Sink Configuration

url
  • Destination URL of Crux HTTP end point

  • Type: String

  • Importance: High

  • Default: "http://localhost:3000"

id.key
  • Record key to use as :crux.db/id

  • Type: String

  • Importance: Low

  • Default: "crux.db/id"