Pulsar topic-to-table parameters
The connector writes messages from your Apache Pulsar™ topics to tables in your database using the mapping and topic properties set in your connector’s configuration YAML file.
This includes the topic name, keyspace name, table name, schema mapping, codec options, and other write options.
Use the following syntax for standalone properties file:
topics: topic_name, topic_name1, topic_name2
topic:
topic_name:
keyspace_name:
table_name:
mapping: 'record=key, content=value'
consistencyLevel: LOCAL_ONE
ttlTimeUnit: SECONDS
ttl: -1
nullToUnset: true
deletesEnabled: true
codec:
locale: en_US
timeZone: UTC
timestamp: CQL_TIMESTAMP
date: ISO_LOCAL_DATE
time: ISO_LOCAL_TIME
unit: MILLISECONDS
Core parameters
The essential, top-level parameters are topics, topic, topic names, keyspace names, and table_names.
topics: topic_name, topic_name1, topic_name2
topic:
topic_name:
keyspace_name:
table_name:
- topics
-
A comma separated list of all topics to which the DataStax connector subscribes.
- topic
-
A container for the configuration details for each subscribed topic. For each subscribed topic, provide the topic, keyspace, and table name nested under
topic. For example:topic: my_topic: my_keyspace: my_table:Make sure that the keyspace and table are the correct targets for the topic. This is where Pulsar will write the messages from the topic.
Table parameters
These parameters are nested under the table name in the configuration YAML file. They define the mapping and write options for the records inserted into the table.
topics: topic_name
topic:
topic_name:
keyspace_name:
table_name:
mapping: 'record=key, content=value'
consistencyLevel: LOCAL_ONE
ttlTimeUnit: SECONDS
ttl: -1
nullToUnset: true
deletesEnabled: true
- consistencyLevel
-
Query consistency level for writes. Valid values depend on your database provider and use case.
Default:
LOCAL_ONE
- deletesEnabled
-
This parameter controls the handling of records that cause all columns to contain null values except for the primary key (PK) columns.
If
false(disabled), records are always applied asINSERT/UPDATEstatements, even if all non-PK columns would become null.If
true(enabled, default), a record that results in all-null values (except for PK columns) is applied as aDELETE, rather than anINSERT/UPDATEthat would write nulls to all non-PK columns.If
true, themappingmust include all columns in order for the setting to function. If themappingdoesn’t include all columns, this parameter is treated asfalse.The behavior applies to the post-mapping result. For example, if a row has a mix of null and non-null values, and the new record nullifies all of the non-null values, the result is a
DELETE.
- mapping
-
Required. A mapping of Pulsar fields to table columns. The exact contents depends on the message format and table schema. For examples and options, see the other mapping topics in this documentation and Mapping properties and functions.
- nullToUnset
-
Whether to treat null values in Pulsar as
UNSETin the database table.This parameter controls the handling of updates versus overrides.
If
true, the DataStax Pulsar connector treats null values as unset fields. When a new record arrives, it only updates fields that aren’t null in the incoming message. DataStax recommends setting this totrueto avoid creating unnecessary tombstones.If
false, fields with null values are visible in the table asUNSETvalues.Default:
true - query
- ttl, ttlTimeUnit, __ttl
-
The default time-to-live (TTL) is
-1, which means that TTL is disabled. If you want to use TTL, there are two ways to set TTL in your connector configuration.-
Static TTL
-
Dynamic TTL
In the table configuration, set
ttlto the amount of seconds that a record remains in the table before it is automatically deleted.table_name: mapping: 'col1=key.f1, col2=value.f1' ttlTimeUnit: SECONDS ttl: 80000In the
mappingparameter, you can use the__ttlproperty to specify the column to use as the TTL value for the record being inserted into the database table. For example:table_name: mapping: 'col1=key.f1, col2=value.f1, __ttl=value.f2' ttlTimeUnit: SECONDSIf you set both static and dynamic TTL, the dynamic TTL (in
mapping) takes precedence, and the connector writes a warning message to the log file.If the dynamic TTL value is negative, an error is thrown when writing records to the table. However, the connector inserts the record without the TTL because the Pulsar connector is a streaming system.
If TTL is set (either statically or dynamically), the same TTL limit is applied to all rows in the mapped table. The DataStax Pulsar connector appends
AND TTL <seconds>to allINSERTstatements for the table.With either static or dynamic TTL, you can optionally set
ttlTimeUnitto specify the time unit for thettlvalue. The default time unit isSECONDS. If you specify a different time unit, such asHOURS, the DataStax Pulsar connector automatically converts the value toSECONDSfor compatibility. -
Mapping properties and functions
The mapping parameter accepts additional properties that you can use to modify how the connector writes records to the table:
- header
-
In the
mappingparameter, you can extract values from the message properties, and then write those values to a database table.For example, the following mapping extracts the
headerpropertyf4from the message properties, and writes it to thecol3column in the specified table:topic: topic_name: keyspace_name: table_name: mapping: 'col1=key.f1, col2=value.f1, __ttl=value.f2, __timestamp=value.f3, col3=header.f4'Use one or more individual mapping statements to extract the desired values and write them to table columns.
- __timestamp, timestampTimeUnit
-
In the
mappingparameter, you can use the optional__timestampproperty to specify which column should be used for the writetime timestamp when a record is written to the database. The specified__timestampcolumn must be a number type. For example:mapping: 'col1=key.f1, col2=value.f1, __timestamp=value.f2'By default, the database internally tracks the
writetimetimestamp of records inserted from Pulsar. Use the__timestampproperty when your Pulsar records have an explicit timestamp field that you want to use as the writetime for the database record produced by the connector.If you set
__timestamp, you can also settimestampTimeUnitto specify the time unit for the provided timestamp value:table_name: timestampTimeUnit: SECONDS mapping: 'col1=key.f1, col2=value.f1, __timestamp=value.f2'The default timestamp time unit is
MICROSECONDS. If you specify a different time unit, such asSECONDS, the DataStax Pulsar connector automatically converts the value toMICROSECONDSfor compatibility. - __ttl
-
See
ttl. - now()
-
In the
mappingparameter, you can use the optionalnow()function to set a column to the current timeuuid when a record is written to the database. The function gets theTIMEUUID, which is written to the table’s UUID and timeuuid column.You can use the
now()function on one or more columns, for example:table_name: mapping: 'col1=now(), col2=now(), col3=now()'A column mapped to
now()must exist in the table definition, and it must have the typetimeuuid.If mapped to multiple columns, each occurrence of
now()is evaluated and returns a differentTIMEUUID.If
deletesEnabledistrueand the record causes all non-primary key columns to be null, then the callednow()function is ignored, and the record is deleted. This prevents the table from containing rows with only a primary key and timestamp.
Codec parameters
Use the codec parameters to configure date and time conversion for each Pulsar topic if you don’t want to use the default settings:
topics: topic_name
topic:
topic_name:
codec:
locale: en_US
timeZone: UTC
timestamp: CQL_TIMESTAMP
date: ISO_LOCAL_DATE
time: ISO_LOCAL_TIME
unit: MILLISECONDS
keyspace_name:
table_name:
Note that the codec parameters are nested under the topic name (topics.topic.topic_name.codec), and they apply to that topic only.
|
In addition to the following settings, the connector supports any public static field in |
- timestamp
-
The temporal pattern to use for string-to-CQL timestamp conversion. Allowed values include the following:
-
A date-time pattern, such as
yyyy-MM-dd HH:mm:ss -
A pre-defined formatter, such as
ISO_ZONED_DATE_TIMEorISO_INSTANT -
A special formatter,
CQL_TIMESTAMP, that accepts all valid CQL literal formats for the timestamp type.Default:
CQL_TIMESTAMPIf the Pulsar records are strings that contain only digits that cannot be parsed by the
timestampformat, use theunitparameter to specify the time unit of the parsed value.
-
- date
-
The temporal pattern to use for string-to-CQL date conversion. Allowed values include the following:
-
A date-time pattern, such as
yyyy-MM-dd -
A pre-defined formatter, such as
ISO_LOCAL_DATEDefault:
ISO_LOCAL_DATEFor example, to write a Pulsar string field like "2018-04-12" to a date column, set
date: "yyyy-MM-dd".
-
- time
-
The temporal pattern to use for string-to-CQL time conversion. Allowed values include the following:
-
A date-time pattern, such as
HH:mm:ss -
A pre-defined formatter, such as
ISO_LOCAL_TIMEDefault:
ISO_LOCAL_TIMEFor example, to write a Pulsar string field like "10:15:30" to a time column, the default
ISO_LOCAL_TIMEcorrectly converts the time format.
-
- unit
-
If the Pulsar records are strings that contain only digits that cannot be parsed by the
timestampformat, use theunitparameter to specify the time unit to apply to the parsed values.unitcan be any TimeUnit enum constant.Default:
MILLISECONDS - timeZone
-
The time zone to use for temporal conversions that don’t convey any explicit time zone information.
Default:
UTCFor example, to write a Pulsar string field that contains values like
2018-03-09T17:12:32.584+01:00[Europe/Paris]to a timestamp column, settimezone: ISO_ZONED_DATE_TIME. - locale
-
Locale to use for locale-sensitive conversions.
Default:
en_US