Legacy Product

Fusion 5.10
    Fusion 5.10

    Apache Pulsar

    Basics

    • Pulsar gives you the ability to subscribe a pipeline to a topic

    • Topics belong to namespaces

      • At the namespace level, you can specify retention policies and schema enforcement

    • Namespaces belong to tenants

      • Tenants can specify authorization/authentication

    Highlights

    • Supports native streaming transformations through Pulsar Functions

    • Uses Apache ZooKeeper for configuration management

    • Uses Apache BookKeeper for durable storage

    • Each document processed has a result object persisted to another topic

    Overview

    Apache Pulsar is a message bus that combines queuing with publishing-subscribing features for server-to-server messaging. A producer publishes a message to a topic and a consumer subscribes to that topic using a subscription type to receive the topic messages.

    With the release of Fusion 5.4, an existing subscription’s Subscription Type can no longer be changed via the UI or API. Delete the existing subscription and create a new subscription when you want to change the subscription type.

    Thanks to Pulsar’s messaging guarantees (and Apache ZooKeeper), messages that have not been acknowledged by a consumer will be persisted and re-published. This means saves to crawl-db collection can be done once when an item is completed (versus stored in Solr first, queried for, and then fetched).

    Apache Pulsar is a multi-tenant messaging system. Several tenants can use the message bus concurrently. Each tenant can have multiple namespaces and each namespace can have multiple topics.

    Even though Pulsar itself is capable of moving data between clusters, Fusion does not support configuring Apache Pulsar to perform geo-replication.

    Types of server-to-plugin calls:

    • Broadcast (pub/sub). The server publishes a message to a topic. Each plugin instance consumes the message through its own unique, exclusive subscription. In this scenario, all plugin instances for a given configId consume the same message. Examples include job actions (start, stop) and health status requests.

    • Queueing. The server publishes messages to a topic, where plugins consume through a shared subscription. Subscriptions that are shared allow multiple consumers to subscribe and consume messages one by one. Shared subscriptions are needed for distributed fetching.

    • Failover. The connector job service processes messages from a plugin output topic. This is just like exclusive (queueing), but allows another server instance to take over if the current instance fails.

    Fusion Integration

    Fusion will create a Pulsar namespace for every app that is created. The default tenant name for Pulsar is {kubernetes-namespace}. Pulsar does not store messages if there are no active subscriptions for the topic. Due to this, namespaces are configured for message retention with a default time of 1 day and a default size of 5GB.

    Messages for topics that have subscriptions but no active consumers will be stored in the backlog. Backlog quota is managed per topic.

    Pulsar is used by default for Signals. _signals_ingest pipeline has a subscription to the _signals_ingest topic in the _system namespace. Signals belong to the _system app that is created in Fusion when the API service boots up.

    Subscriptions are integrated with the Fusion UI. See Subscriptions UI for more information.

    Frequently asked questions

    What is the difference between backlogs and retention policy?

    Backlogs

    Backlogs are:

    • Sets of unacknowledged messages for a topic.

    • Stored by Pulsar bookies until the messages are processed and acknowledged.

    • Governed by the following policies:

      • Backlog quota. An allowable size threshold for each topic in the namespace.

      • Backlog retention. The action the broker takes if the threshold is exceeded. The actions are producer_request_hold, producer_exception, and consumer_backlog_eviction.

    Topic retention policy

    Topic retention policy sets the size limit and the amount of time messages are kept after they are acknowledged. These settings are configured on the namespace level and applied per each topic.

    What are the backlog and retention storage requirements?

    The bookkeeper ensemble requires enough disk space to store messages retained in the backlog and kept after acknowledgement. The backlog and retention policies are applied independently to each topic. To calculate enough space, include the total for both backlog and retention needs across all topics.

    By default, the following retention policy values are applied per each topic:

    • Backlog quota size: 10GB

    • Backlog retention policy: producer_request_hold

    • Topic retention policy size: 0

    • Topic retention policy time: 0

    By default, Fusion creates the following topics:

    • _system/_signals_ingest

    • _system/_signals_ingest_res

    • _system/_fusion_crud_events

    • _system/_fusion_link_events

    • _logs/system_logs with:

      • Backlog quota size: 1GB

      • Backlog retention policy: consumer_backlog_eviction is turned on

    How do I change heap memory settings?

    Change the default heap memory settings in the following heap chart values:

    • broker.configData.PULSAR_MEM

    • bookkeeper.configData.PULSAR_MEM

    What is the difference between broker and bookkeeper, and how do I scale them?

    Broker

    Broker is a stateless component responsible for:

    • Managing Pulsar topics.

    • Handling produce and consume operations coming from clients.

    Additional information:

    • A single Pulsar topic can be assigned to only one broker instance. That instance is responsible for handling client requests for that topic.

    • The Pulsar topic-broker assignments must be distributed evenly (load balanced) to ensure optimal performance.

    • Broker instances are easily scaled up or down based on the number of topics and message volume in the cluster.

    Bookkeeper

    Bookkeeper is responsible for:

    • Persisting message data.

    • Storing messages in a consistent, accessible, and fault-tolerant manner.

    Parameter information

    The following parameters determine how each topic is stored in the bookkeeper namespace persistence setting:

    • Ensemble size: Size of the pool of bookkeeper instances available for that ledger, to which Pulsar writes. The Fusion default is 2.

    • Write quorum: Number of bookkeepers to which Pulsar writes. The value is less than or equal to the Ensemble size. The Fusion default is 2.

    • ACK quorum: Number of bookkeepers that must acknowledge the write action for the Pulsar broker to send an acknowledgement to the client. The Fusion default is 2.

    Bookkeeper and fragment information

    • When a new bookkeeper is added to the cluster:

      • New fragments or ledgers that are created write to the new bookkeeper immediately.

      • Manual load balancing may not be necessary. Use standard procedures to load balance the system.

    • Each fragment can be stored on a different subset of bookies in the cluster.

    • Topics and ledgers are not coupled to a set of bookkeeper nodes.

    Rack information

    A rack can be a logical construct. For example, an Availability Zone (AZ) in a cloud environment.

    A rack-aware policy allows:

    • Bookkeepers to be marked as belonging to a specific rack.

    • The bookkeeper client of the Pulsar broker to select bookies from different racks.

    Brokers on individual nodes

    In addition to rack awareness, you can configure individual brokers to use a bookkeeper node from a certain group. See Pulsar configuration for more information.