Skip to main content

Pulsar

Integration Details

The Datahub Pulsar source plugin extracts topic and schema metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the Pulsar admin Rest API interface to interact with the Pulsar instance. The following APIs are used in order to:

The data is extracted on tenant and namespace basis, topics with corresponding schema (if available) are ingested as Dataset into Datahub. Some additional values like schema description, schema_version, schema_type and partitioned are included as DatasetProperties.

Concept Mapping

This ingestion source maps the following Source System Concepts to DataHub Concepts:

Source ConceptDataHub ConceptNotes
pulsarData Platform
Pulsar TopicDatasetsubType: topic
Pulsar SchemaSchemaFieldMaps to the fields defined within the Avro or JSON schema definition.

Metadata Ingestion Quickstart

For context on getting started with ingestion, check out our metadata ingestion guide.

Module pulsar

Incubating

Important Capabilities

CapabilityStatusNotes
DomainsSupported via the domain config field
Platform InstanceEnabled by default

PulsarSource(config: datahub.ingestion.source_config.pulsar.PulsarSourceConfig, ctx: datahub.ingestion.api.common.PipelineContext)

NOTE: Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).

Prerequisites

In order to ingest metadata from Apache Pulsar, you will need:

  • Access to a Pulsar Instance, if authentication is enabled a valid access token.
  • Pulsar version >= 2.7.0

NOTE: A superUser role is required for listing all existing tenants within a Pulsar instance.

CLI based Ingestion

Install the Plugin

pip install 'acryl-datahub[pulsar]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"

sink:
# sink configs

Config Details

Note that a . is used to denote nested fields in the YAML recipe.

View All Configuration Options
Field [Required]TypeDescriptionDefaultNotes
client_id [✅]stringThe application's client IDNone
client_secret [✅]stringThe application's client secretNone
exclude_individual_partitions [✅]booleanExtract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.True
issuer_url [✅]stringThe complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication.None
oid_config [✅]objectPlaceholder for OpenId discovery documentNone
platform_instance [✅]stringThe instance of the platform that all assets produced by this recipe belong toNone
tenants [✅]array(string)None
timeout [✅]integerTimout setting, how long to wait for the Pulsar rest api to send data before giving up5
token [✅]stringThe access token for the application. Mandatory for token based authentication.None
verify_ssl [✅]UnionType (See notes for variants)Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.TrueOne of boolean,string
web_service_url [✅]stringThe web URL for the cluster.http://localhost:8080
env [✅]stringThe environment that all assets produced by this connector belong toPROD
domain [✅]map(str,AllowDenyPattern)A class to store allow deny regexesNone
domain.key.allow [❓ (required if domain is set)]array(string)None
domain.key.deny [❓ (required if domain is set)]array(string)None
domain.key.ignoreCase [❓ (required if domain is set)]booleanWhether to ignore case sensitivity during pattern matching.True
namespace_patterns [✅]AllowDenyPatternList of regex patterns for namespaces to include/exclude from ingestion. By default the functions namespace is denied.{'allow': ['.*'], 'deny': ['public/functions'], 'ignoreCase': True}
namespace_patterns.allow [❓ (required if namespace_patterns is set)]array(string)None
namespace_patterns.deny [❓ (required if namespace_patterns is set)]array(string)None
namespace_patterns.ignoreCase [❓ (required if namespace_patterns is set)]booleanWhether to ignore case sensitivity during pattern matching.True
tenant_patterns [✅]AllowDenyPatternList of regex patterns for tenants to include/exclude from ingestion. By default all tenants are allowed.{'allow': ['.*'], 'deny': ['pulsar'], 'ignoreCase': True}
tenant_patterns.allow [❓ (required if tenant_patterns is set)]array(string)None
tenant_patterns.deny [❓ (required if tenant_patterns is set)]array(string)None
tenant_patterns.ignoreCase [❓ (required if tenant_patterns is set)]booleanWhether to ignore case sensitivity during pattern matching.True
topic_patterns [✅]AllowDenyPatternList of regex patterns for topics to include/exclude from ingestion. By default the Pulsar system topics are denied.{'allow': ['.*'], 'deny': ['/__.*$'], 'ignoreCase': True}
topic_patterns.allow [❓ (required if topic_patterns is set)]array(string)None
topic_patterns.deny [❓ (required if topic_patterns is set)]array(string)None
topic_patterns.ignoreCase [❓ (required if topic_patterns is set)]booleanWhether to ignore case sensitivity during pattern matching.True
stateful_ingestion [✅]StatefulStaleMetadataRemovalConfigsee Stateful IngestionNone
stateful_ingestion.enabled [❓ (required if stateful_ingestion is set)]booleanThe type of the ingestion state provider registered with datahub.None
stateful_ingestion.ignore_new_state [❓ (required if stateful_ingestion is set)]booleanIf set to True, ignores the current checkpoint state.None
stateful_ingestion.ignore_old_state [❓ (required if stateful_ingestion is set)]booleanIf set to True, ignores the previous checkpoint state.None
stateful_ingestion.remove_stale_metadata [❓ (required if stateful_ingestion is set)]booleanSoft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.True

Code Coordinates

  • Class Name: datahub.ingestion.source.pulsar.PulsarSource
  • Browse on GitHub

Questions

If you've got any questions on configuring ingestion for Pulsar, feel free to ping us on our Slack