Apache Pulsar
Basics
-
Pulsar gives you the ability to subscribe a pipeline to a topic
-
Topics belong to namespaces
-
At the namespace level, you can specify retention policies and schema enforcement
-
-
Namespaces belong to tenants
-
Tenants can specify authorization/authentication
-
Highlights
-
Supports native streaming transformations through Pulsar Functions
-
Uses Apache ZooKeeper for configuration management
-
Uses Apache BookKeeper for durable storage
-
Each document processed has a result object persisted to another topic
Overview
Apache Pulsar is a message bus that combines queuing with publishing-subscribing features for server-to-server messaging. A producer publishes a message to a topic and a consumer subscribes to that topic using a subscription type to receive the topic messages.
With the release of Fusion 5.4, an existing subscription’s Subscription Type can no longer be changed via the UI or API. Delete the existing subscription and create a new subscription when you want to change the subscription type. |
Thanks to Pulsar’s messaging guarantees (and Apache ZooKeeper), messages that have not been acknowledged by a consumer will be persisted and re-published. This means saves to crawl-db collection can be done once when an item is completed (versus stored in Solr first, queried for, and then fetched).
Apache Pulsar is a multi-tenant messaging system. Several tenants can use the message bus concurrently. Each tenant can have multiple namespaces and each namespace can have multiple topics.
Even though Pulsar itself is capable of moving data between clusters, Fusion does not support configuring Apache Pulsar to perform geo-replication. |
Types of server-to-plugin calls:
-
Broadcast (pub/sub). The server publishes a message to a topic. Each plugin instance consumes the message through its own unique, exclusive subscription. In this scenario, all plugin instances for a given
configId
consume the same message. Examples include job actions (start, stop) and health status requests. -
Queueing. The server publishes messages to a topic, where plugins consume through a shared subscription. Subscriptions that are shared allow multiple consumers to subscribe and consume messages one by one. Shared subscriptions are needed for distributed fetching.
-
Failover. The connector job service processes messages from a plugin output topic. This is just like exclusive (queueing), but allows another server instance to take over if the current instance fails.
Fusion Integration
Fusion will create a Pulsar namespace for every app that is created. The default tenant name for Pulsar is {kubernetes-namespace}
.
Pulsar does not store messages if there are no active subscriptions for the topic. Due to this, namespaces are configured for message retention with a default time of 1 day and a default size of 5GB.
Messages for topics that have subscriptions but no active consumers will be stored in the backlog. Backlog quota is managed per topic.
Pulsar is used by default for Signals. _signals_ingest
pipeline has a subscription to the _signals_ingest
topic in the _system
namespace. Signals belong to the _system
app that is created in Fusion when the API service boots up.
Subscriptions are integrated with the Fusion UI. See Subscriptions UI for more information. |
Troubleshooting Apache Pulsar in Fusion
See Troubleshooting Apache Pulsar Issues for more information.
Frequently asked questions
What is the difference between backlogs and retention policy?
Backlogs
Backlogs are:
-
Sets of unacknowledged messages for a topic.
-
Stored by Pulsar bookies until the messages are processed and acknowledged.
-
Governed by the following policies:
-
Backlog quota. An allowable size threshold for each topic in the namespace.
-
Backlog retention. The action the broker takes if the threshold is exceeded. The actions are
producer_request_hold
,producer_exception
, andconsumer_backlog_eviction
.
-
Topic retention policy
Topic retention policy sets the size limit and the amount of time messages are kept after they are acknowledged. These settings are configured on the namespace level and applied per each topic.
What are the backlog and retention storage requirements?
The bookkeeper ensemble requires enough disk space to store messages retained in the backlog and kept after acknowledgement. The backlog and retention policies are applied independently to each topic. To calculate enough space, include the total for both backlog and retention needs across all topics.
By default, the following retention policy values are applied per each topic:
-
Backlog quota size: 10GB
-
Backlog retention policy:
producer_request_hold
-
Topic retention policy size: 0
-
Topic retention policy time: 0
By default, Fusion creates the following topics:
-
_system/_signals_ingest
-
_system/_signals_ingest_res
-
_system/_fusion_crud_events
-
_system/_fusion_link_events
-
_logs/system_logs
with:-
Backlog quota size: 1GB
-
Backlog retention policy:
consumer_backlog_eviction
is turned on
-
How do I change heap memory settings?
Change the default heap memory settings in the following heap chart values:
-
broker.configData.PULSAR_MEM
-
bookkeeper.configData.PULSAR_MEM
What is the difference between broker and bookkeeper, and how do I scale them?
Broker
Broker is a stateless component responsible for:
-
Managing Pulsar topics.
-
Handling produce and consume operations coming from clients.
Additional information:
-
A single Pulsar topic can be assigned to only one broker instance. That instance is responsible for handling client requests for that topic.
-
The Pulsar topic-broker assignments must be distributed evenly (load balanced) to ensure optimal performance.
-
Broker instances are easily scaled up or down based on the number of topics and message volume in the cluster.
Bookkeeper
Bookkeeper is responsible for:
-
Persisting message data.
-
Storing messages in a consistent, accessible, and fault-tolerant manner.
Parameter information
The following parameters determine how each topic is stored in the bookkeeper namespace persistence setting:
-
Ensemble size: Size of the pool of bookkeeper instances available for that ledger, to which Pulsar writes. The Fusion default is 2.
-
Write quorum: Number of bookkeepers to which Pulsar writes. The value is less than or equal to the Ensemble size. The Fusion default is 2.
-
ACK quorum: Number of bookkeepers that must acknowledge the write action for the Pulsar broker to send an acknowledgement to the client. The Fusion default is 2.
Bookkeeper and fragment information
-
When a new bookkeeper is added to the cluster:
-
New fragments or ledgers that are created write to the new bookkeeper immediately.
-
Manual load balancing may not be necessary. Use standard procedures to load balance the system.
-
-
Each fragment can be stored on a different subset of bookies in the cluster.
-
Topics and ledgers are not coupled to a set of bookkeeper nodes.
Rack information
A rack can be a logical construct. For example, an Availability Zone (AZ) in a cloud environment.
A rack-aware policy allows:
-
Bookkeepers to be marked as belonging to a specific rack.
-
The bookkeeper client of the Pulsar broker to select bookies from different racks.
Brokers on individual nodes
In addition to rack awareness, you can configure individual brokers to use a bookkeeper node from a certain group. See Pulsar configuration for more information.
Additional Pulsar information
See the following topics for more information: