1. Home
  2. Dokumente
  3. Agent (Any Connect)V 3.0 Administration Guide
  4. 4. Connector supported by the Agent
  5. 4.5 Kafka Adapter

4.5 Kafka Adapter

Introduction:

  • The source of an event can be from internal or external inputs. Events can generate from a user, like a mouse click or keystroke, an external source, such as a sensor output, or come from the system, like loading a program.
  • This is the modern way of communication between different applications. It has two components: Publish and subscribe. For example, Netflix, youtube.
  • An event is any significant occurrence or change in state for system hardware or software. An event is not the same as an event notification, which is a message or notification sent by the system to notify another part of the system that an event has taken place.

Kafka Adapter is basically an Event-based communication. Event-based communication contains two main components: Publish (Producer) and Subscribe (Consumer). The work of a publisher is to upload a message into a queue or topic, which is Kafka or Pulsar and the middleware tool like Kafka and Pulsar will maintain the message, they follow the principle of Publish and Subscribe. And they will deliver messages to the consumer. Then all the applications can subscribe. For example, Netflix plays the role of a publisher. and we play the role of the subscriber(Consumer). Same way Kafka Adapter which we will see in this tutorial.

Event-based Communication:

  • An event-driven architecture can help organizations achieve a flexible system that can adapt to changes and make decisions in real-time. Real-time situational awareness means that business decisions, whether manual or automated, can be made using all of the available data that reflects the current state of your systems.
  • Events are captured as they occur from event sources such as Internet of Things (IoT) devices, applications, and networks, allowing event producers and event consumers to share status and response information in real-time.
  • Organizations can add event-driven architecture to their systems and applications to improve the scalability and responsiveness of applications and access to the data and context needed for better business decisions.
  • This is the modern way of communication between different applications. It has two components: Publish and subscribe—for example, Netflix, youtube.
  • We can see the picture given below for reference. The SAP is a producer who pushing messages and Kafka or Pulsar works here as middleware, and maintain messages. All other applications like Twitter, HubSpot, salesforce is a consumer or subscriber here. So SAP just needed to publish it one time, they don’t need to send it to different applications individually.
  • This is the modern way of communication between different applications. It has two components: Publish and subscribe. For example, Netflix, youtube.
  • We can see the picture given below for reference. The SAP is a producer who pushing messages and Kafka or Pulsar works here as middleware, and maintain messages. All other applications like Twitter, HubSpot, salesforce is a consumer or subscriber here. So SAP just needed to publish it one time, they don’t need to send it to different applications individually.

How does event-driven architecture work?

SKYVVA Integration is a comprehensive set of integration and messaging technologies to connect applications and data across hybrid infrastructures. It is an agile, distributed, containerized, and API-centric solution. It provides service composition and orchestration, application connectivity and data transformation, real-time message streaming, change data capture, and API management—all combined with a cloud-native platform and toolchain to support the full spectrum of modern application development.

  • The event-driven architecture is made up of event producers and event consumers. An event producer detects or senses an event and represents the event as a message. It does not know the consumer of the event or the outcome of an event.
  • After an event has been detected, it is transmitted from the event producer to the event consumers through event channels, where an event processing platform processes the event synchronously. Event consumers need to be informed when an event has occurred. They might process the event or may only be impacted by it.
  • The event processing platform will execute the correct response to an event and send the activity downstream to the right consumers. This downstream activity is where the outcome of an event is seen

  • One more example for better understanding.

We all use Netflix to watch movies. It uses the Queuing tool and streaming tool. Netflix here plays the role of producer and we play the role of the consumer. Netflix uploads Movie once and all others who are subscribers of Netflix can watch.

They do not need to request again and again. It takes so much time to send it individually, here we just needed to upload once and all the subscribers will get it.

Event-Driven Technology Terminology:

We have 5 main Event terminology.

  1. Publish
  2. Subscribe
  3. Streaming
  4. Producer
  5. Consumer

Check the picture below for a description.

Kafka Adapter:

We have 2 Kafka Adapter-

  • Outbound/Producer Kafka Adapter:

We need Outbound Kafka Adapter when we want to send data out from salesforce, here We need to create a topic first. From salesforce, we can send data messages to our agent’s API endpoint, and then we have a camel producer to publish into the topic invoice, which is on the Kafka side. This is the outbound process because we are sending data out here, It plays the role of Producer.

  • Inbound/Consumer Kafka Adapter:

We use Inbound Kafka Adapter when somebody pushes the topic into a topic customer on the Kafka side, then we are consuming. We have an event-driven listener which is a camel consumer. We don’t need a scheduler. We can consume immediately. This is the Inbound process because we are receiving data here. It plays the role of the consumer.

Check the picture given below for reference.

To configure, firstly we need to learn and follow some steps:

1.How to configure a new agent control board.

  • Go to the Integration Detail page.
  • Click on the new agent control board.

Click on Agent Control Board Tab

  –How to use the Agent Control Board?

Since the user now completely understand; How to use the Agent Control Board, by hitting the upper link, now we understand the Architect of the Agent Control Board now follows below on Salesforce org.

2.How to check the Caching New agent control board.

We can check it for different Object name.

  • Check for the Integration. For that select Integration in the object name.

  • Check for the Interface. For that select Interface in the object name.

  • Check for the Mapping. For that select Mapping in the object name.

  • Check for the Adapter. For that select Adapter in the object name.

  • Check for the Interface Group. For that select Interface Group in the object name.

3. How Kafka works?

Kafka Startup:

A script to start Kafka and 2 sample topics is available on the EC2 instances at the following path.

/home/ec2-user[ec2-user@ip-172-20-63-98 ~]$ sh startKafka.sh

Starting Zookeeper

Zookeeper started successfully!

Starting Kafka server

Kafka server started successfully!

  • Creating producerTopic

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N

  • producerTopic created successfully!
  • Creating consumerTopic

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N consumerTopic created successfully!

[ec2-user@ip-172-20-63-98 ~]$

  • Ignore the warnings, and in case if the above command throws an error saying that topic already exists, then we shall ignore it.

Stopping Kafka

  • We can find a script to stop Kafka in the home directory as well.

/home/ec2-user[ec2-user@ip-172-20-63-98 ~]$ sh stopKafka.sh

rw-rw-r- 1 ec2-user ec2-user 75 May 29 16:26 stopKafka.sh

We can stop Kafka & Zookeeper by invoking the above script using the following command.

sh stopKafka.sh

  • Create a Kafka topic

We can create a Kafka Topic by invoking the following command.

${KAFKA_HOME}/bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic <GiveTopicNameHere>

[ec2-user@ip-172-20-63-98 ~]$ ${KAFKA_HOME}/bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic testTopic

 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N

[ec2-user@ip-172-20-63-98 ~]$

  • The above command has created a topic with a name as testTopic.
  • We can verify all the topics available in the Kafka server by invoking the following command.

${KAFKA_HOME}/bin/kafka-topics.sh –zookeeper localhost:2181 –list

[ec2-user@ip-172-20-63-98 ~]$ ${KAFKA_HOME}/bin/kafka-topics.sh –zookeeper localhost:2181 –list

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel threads appropriately using -XX:ParallelGCThreads=N

consumerTopic

producerTopic

testTopic

Command-line Consumer

Kafka provides a utility to invoke a command-line consumer. This consumer will consume a Kafka Topic and print the messages into the command line, confirming that the messages were posted to the right topic.

Invoke the following command to consume from a Kafka Topic.

${KAFKA_HOME}/bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic testTopic –from-beginning

  • We can see the messages as the output from the above command.
Note
Kafka oubound Adapter supports CSV,JSON and XML. Kafka Inbound Adapter supports CSV and XML

Agent Kafka adapter for producer

Agent Kafka adapter for Producer uses for writing data to Topic, meaning that sends data from Salesforce to Topic in Apache Kafka. To use Kafka for Producer, we need to have a Kafka; then, we need to configure an adapter in the Agent control board.  In this chapter, we will learn how to use the Agent Kafka Adapter for Producer, and we will know how to do outbound with CSV, XML, and JSON file type.

Case 1

Create Outbound KAFKA Adapter for CSV format.

  1. Firstly, we need to create a Kafka topic on the Kafka server.
  2. Create a Kafka topic named: CSV-Topic (As shown in the picture given below).

3. We should create different topics to identify different data formats. (This we needed for other use cases,            for now, we are using CSV Topic ).

  • ${KAFKA_HOME}/bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic CSV-Topic
  • ${KAFKA_HOME}/bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic XML-Topic
  • ${KAFKA_HOME}/bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic JSON-Topic

4. After creating a  topic we can use command below to show all Kafka topic.

  • ${KAFKA_HOME}/bin/kafka-topics.sh –zookeeper localhost:2181 –list

5. Go to the new Agent Control Board => Ping Agent Connection.

6. Create KAFKA Outbound Adapter for testing with CSV format.

  • Click on New as shown below.

 

  • Fill all the required fields as shown below.
  • Save it.

 

Note
CSV file supports Flat mapping and XML/JSON supports hierarchical mapping.

7. Create a message type using below XML.


<?xml version=”1.0″ encoding=”UTF-8″?>

<AccountTestV3 xmlns=”http://schema.infor.com/InforOAGIS/2″>

<AccountNumber>100001</AccountNumber>

<Name>TestAccountA1</Name>

<BillingCountry>CambodiaA1</BillingCountry>

<BillingCity>PhnomPenhA1</BillingCity>

<Account_ID>100001</Account_ID>

<Description>This tutorial will teach you the basics of XML. The tutorial is divided into sections such as XML Basics Advanced XML and XML tools.</Description>

</AccountTestV3>

8. Create Integration.

9. Create Interface

  • Name
  • Source: Account
  • Status: Deployed
  • Operation: upsert
  • Type: Outbound
  • Processing Mode: Synchronous
Note
Kafka supports only Synchronous Processing Mode 
  • Adapter:- Add CSV Kafka outbound adapter, which we created in point 6.
  • Metadata Provider
  • Repository
  • Message Type: Add the message type which we have created in point 7.
  • Mapping

10. Consume message send from Salesforce in CSV format

${KAFKA_HOME}/bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic CSV-Topic

11. Callout to Kafka server

12. Here is the result on the server.

 

13. The result in message monitoring.

 

 

14. CSV support other separators like the semicolon, tab, and pipe separator.

Case 2

Create Outbound KAFKA Adapter for JSON format.

  1. Create JSON Adapter.

8. Create Integration.

9. Create Interface

  • Name
  • Source: Account
  • Status: Deployed
  • Operation: upsert
  • Type: Outbound
  • Processing Mode: Synchronous
Note
Kafka supports only Synchronous Processing Mode 
  • Adapter:- Add JSON Kafka outbound adapter.
  • Metadata Provider
  • Repository
  • Message Type: Add the message type. (Hierarchical structure)
  • Mapping

  • Consume message send from Salesforce in JSON format.

${KAFKA_HOME}/bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic JSON-Topic

10. Callout to Kafka server

11. Here is the result on the server.

12. The result on Message Monitoring.

Case 3

Create Outbound KAFKA Adapter for XML format.

  • We need to follow the same steps as in case 1, case 2. Just need to create an XML adapter.

How to work with Kafka Inbound Adapter?

Case 1: Create Inbound KAFKA Adapter for CSV format.

  1. Create an Inbound Kafka Topic.
  2. Use this command ” ${KAFKA_HOME}/bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic TestInboundCSVconsumerTopic  “

3. Go to the new Agent control board and create a new Adapter Kafka for CSV data format inbound processing.

4. We can run or stop Kafka Inbound adapter in the Adapter Screen.

5. Create message type for CSV Kafka inbound using below CSV data


Account_ID,AccountNumber,Name,Billincity,BillingCountry,Description

11,100001,Test KAFKA Account1,#PP1,CAM1,Testing CSV Inbound

12,100002,Test KAFKA Account2,#PP2,CAM2,Testing CSV Inbound

6. Create Interface

7. Create Interface

  • Name
  • Source: Account
  • Status: Deployed
  • Operation: upsert
  • Type: Intbound
  • Processing Mode: Synchronous
Note
Kafka supports only Synchronous Processing Mode 
  • Adapter:- Add CSV Kafka Inbound adapter.
  • Metadata Provider
  • Repository
  • Message Type: Add the message type. (Flat structure)
  • Mapping

8. Click the button to start Adapter to start the route.

9. Public message to the topic

$KAFKA_HOME/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic TestInboundCSVconsumerTopic

10. The format is given below for data for CSV.


__SKYVVA__START_MESSAGE

Account_ID,AccountNumber,Name,Billincity,BillingCountry,Description

11,100001,Test KAFKA Account1,#PP1,CAM1,Testing CSV Inbound

12,100002,Test KAFKA Account2,#PP2,CAM2,Testing CSV Inbound

13,100003,Test KAFKA Account3,#PP3,CAM3,Testing CSV Inbound

14,100004,Test KAFKA Account4,#PP4,CAM4,Testing CSV Inbound

15,100005,Test KAFKA Account5,#PP5,CAM5,Testing CSV Inbound

16,100006,Test KAFKA Account6,#PP6,CAM6,Testing CSV Inbound

__SKYVVA__END_MESSAGE

 

 

  • Check the result on message monitoring.

Case 4

Create Inbound KAFKA Adapter for XML format.

  • We need to follow the same steps as in case 1. Just need to create an XML adapter.

This Feature is really helpful and a modern way of communication, which is event-based communication. Kafka can handle many terabytes of data without incurring much at all in the way of overhead. Kafka persists the messages on the disks, which provides intra-cluster replication. This makes for a highly durable messaging system. Kafka replicates data and can support multiple subscribers. Developing a Kafka adapter makes our client work easier and efficient too.

Agent Kafka adapter for consumer

Agent Kafka adapter for consumer uses for reading data records from Topic, meaning that we do inbound from Kafka to salesforce. To use Agent Kafka for consumers, we need to configure the Agent control board, and we need to create an Agent Kafka adapter. In the adapter, we need to fill in a specific topic name and one broker to connect, and Kafka will automatically take care of pulling the data from the right broker to the salesforce. This tutorial will learn how to do Agent Kafka adapter for the consumer with file types CSV, XML, and JSON.

Pre-require

  • Configure a new agent control board.
  • To checking to catch a new agent control board.
  • Starting and create Topic in the Kafka system.
  • Create an adapter.
  • Create integration and Interface.

1.How to configure a new agent control board.

  • Go to the Integration Detail page.
  • Click on the new agent control board.

Step1: Click on Agent Control Board Tab

  –How to use the Agent Control Board?

Since the user now completely understand; How to use the Agent Control Board, by hitting the upper link, now we understand the Architect of the Agent Control Board now follows below on Salesforce org.

2. How to work with Kafka Outbound Adapter?

  • We need to create a Topic first.

A topic is a category/feed title to which records are stored and published. All Kafka records are organized into topics. In Kafka, we need to create different topics for different types of records. Example: We have three types of records such as CSV, XML, and JSON so we need to create different topics also like TestInboundCSVconsumerTopic, TestInboundXMLconsumerTopic and TestInboundJsonconsumerTopic.

– here’s a command line to create Topic.

${KAFKA_HOME}/bin/kafka-topics.sh –create –bootstrap-server localhost:9092 –replication-factor 1 –partitions 1 –topic <name>

–  Here’s example.

After we create different topics, we can use the command line to see all the topics that we have in Kafka.

–  here’s the command line to see all the topics in Kafka.

${KAFKA_HOME}/bin/Kafka-topics.sh –zookeeper localhost:2182 –list

–  here’s an example.


Case 1

Create Inbound KAFKA Adapter for CSV format.

1. Go to the new Agent control board and create a new Adapter Kafka for CSV data format inbound processing

– Here’s adapter InboundCSV

2. We can run or stop Kafka Inbound adapter in the Adapter Screen.

3. Create message type for CSV Kafka inbound using below CSV data


Account_ID,AccountNumber,Name,Billincity,BillingCountry,Description

11,100001,Test KAFKA Account1,#PP1,CAM1,Testing CSV Inbound

12,100002,Test KAFKA Account2,#PP2,CAM2,Testing CSV Inbound

4. Create Integration.

5. Create Interface.

  • Name
  • Source: Account
  • Status: Deployed
  • Operation: upsert
  • Type: Inbound
  • Processing Mode: Synchronous
Note
Kafka supports only Synchronous Processing Mode 
  • Adapter:- Add CSV Kafka Inbound adapter.
  • Metadata Provider
  • Repository
  • Message Type: Add the message type. (Flat structure)
  • Mapping

6. Click the button start Adapter to start the route.

3. How to check the Caching New Agent control board

We can check it for different Object names. To ensure that Integration, Interface, adapter, and mapping are in the catch. If we some properties that don’t work in the catch, it will get an error.

  • Check for the Integration. For that, select Integration in the object name.

  • Check for the Interface. For that, select Interface in the object name.

  • Check for the Mapping. For that, select Mapping in the object name.

  • Check for the Adapter. For that, select the Adapter in the object name.

  • Check for the Interface Group. For that, select Interface Group in the object name.

4. How to set data to Kafka to send to salesforce? 

  1. Public message to the topic

$KAFKA_HOME/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic TestInboundCSVconsumerTopic

2. The format is given below for data for CSV.

Note
When we want to use CSV record, the format is need to have __SKYVVA__START_MESSAGE  AND __SKYVVA__END_MESSAGE  

__SKYVVA__START_MESSAGE

Account_ID,AccountNumber,Name,Billincity,BillingCountry,Description

11,100001,Test KAFKA Account1,#PP1,CAM1,Testing CSV Inbound

12,100002,Test KAFKA Account2,#PP2,CAM2,Testing CSV Inbound

13,100003,Test KAFKA Account3,#PP3,CAM3,Testing CSV Inbound

14,100004,Test KAFKA Account4,#PP4,CAM4,Testing CSV Inbound

15,100005,Test KAFKA Account5,#PP5,CAM5,Testing CSV Inbound

16,100006,Test KAFKA Account6,#PP6,CAM6,Testing CSV Inbound

__SKYVVA__END_MESSAGE

  • Check the result on message monitoring.


Case 2

Create Inbound KAFKA Adapter for XML format.

  1. Create inbound Interface with XML message Type.
  • Here’s the message type XML, and we need to put Namespace for the first level.

  • Here’s the Inbound Interface.

  • Here’s the mapping.

2. We need to create an XML adapter, Kafka.

Go to agent control board => click Adapter =>click Button new. ( we configuration like the case1 but choose filetype xml)

  •  Here’s the XML adapter Kafka.

3. Click the button start Adapter to start the route.

4. Linked the adapter on Interface.

5. Public message to the topic

$KAFKA_HOME/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic TestInboundXMLconsumerTopic

  • Here’s an example below.

Note
When we want to use XML record,  we don’t need to have Start and End message but we need to convert XML data in to one line. Also, make sure that Data and message Types are the same structure.
  • Here’s xml that converts to one line.

<?xml version=”1.0″ encoding=”UTF-8″?><Root xmlns=”http://www.w3.org/TR/html4/”><Account><BillingCountry>pp</BillingCountry><BillingCity>Khmer</BillingCity><Name>Meas</Name><Contact><Email>test2@gmail.com</Email><FirstName>Manuroth</FirstName><LastName>ma</LastName></Contact></Account><Account><BillingCountry>pp</BillingCountry><BillingCity>Khmer</BillingCity><Name>Golden</Name><Contact><Email>test2@gmail.com</Email><FirstName>Manuroth</FirstName><LastName>gaga</LastName></Contact></Account><Account><BillingCountry>pp</BillingCountry><BillingCity>Khmer</BillingCity><Name>Audan</Name><Contact><Email>test2@gmail.com</Email><FirstName>Pi</FirstName><LastName>Audan</LastName></Contact></Account><Account><BillingCountry>pp</BillingCountry><BillingCity>Khmer</BillingCity><Name>ko131</Name><Contact><Email>test2@gmail.com</Email><FirstName>Manuroth</FirstName><LastName>chita</LastName></Contact></Account></Root>

  • Here’s the example to public.

  • Here’s the result of XML

Case 3: Create an Inbound KAFKA Adapter for JSON format.

  1. Create inbound Interface with message Type.
  • Here’s the message type.

  • Here’s Inbound Interface with Message Type.

  • Do mapping

2. We need to create a Kafka adapter Type JSON.

Go to agent control board => click Adapter =>click Button new. ( we configuration like the case1 but choose filetype xml).

  • Here’s JSON Adapter Kafka.

  • Click the button Start adapter.

3. Linked adapter with the Interface.

4.Public message to the topic.

$KAFKA_HOME/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic RothJsonInbound

Note
We need to convert Json data to single line.

  • Go to monitor to see the result.

Agent Kafka Consumer uses reading data from Topic to Salesforce, which means we Send data from Topic in Kafka to Salesforce. In this adapter, we can do Inbound with CSV, XML, and JSON data records, but we need to make sure our Adapter is configured to match with the file type.

Fandest du diesen Artikel hilfreich? Ja Nein

Wie können wir helfen?