Import with the Bulk Loader
Create and run Parallel Bulk Loader jobs
Use the Jobs manager to create and run Parallel Bulk Loader jobs. You can also use the Scheduler to schedule jobs.
In the procedures, select Parallel Bulk Loader as the job type and configure the job as needed.
Configuration settings for the Parallel Bulk Loader job
This section provides configuration settings for the Parallel Bulk Loader job. Also see configuration properties in the Jobs Configuration Reference.
Read settings
Setting | Description | ||
---|---|---|---|
|
Unique identifier of the data source provider. Spark scans the job’s |
||
|
Comma-delimited list of paths to load. Some data sources, such as parquet, require a path. Others, such as Solr, do not. Refer to the documentation for your data source to determine if you need to provide a path. |
||
|
Options passed to the Spark SQL data source to configure the read operation. Options differ for every data source. Refer to the specific data source documentation for more information. |
||
|
List of Spark configuration settings needed to run the Parallel Bulk Loader. |
||
|
Behind the scenes, the Parallel Bulk Loader job submits a Scala script to the Fusion Spark shell. The
Comma-separated list of Maven coordinates of JAR files to include on the driver and executor classpaths. Spark searches the local Maven repository, and then Maven central and any additional remote repositories given in the config. The format for the coordinates should be
Comma-separated list of additional remote Maven repositories to search for the Maven coordinates given in the |
||
|
For datasources that support time-based filters, the Parallel Bulk Loader computes the timestamp of the last document written to Solr and the current timestamp of the Parallel Bulk Loader job. For example, the HBase data source lets you filter the read between a
This lets Parallel Bulk Loader jobs run on schedules, and pull only the newest rows from the underlying datasources. To support timestamp based filtering, the Parallel Bulk Loader provides two simple macros:
The See the Index HBase tables example below for an example of using this configuration property. |
Transformation settings
Setting | Description |
---|---|
|
Sometimes, you can write a small script to transform input data into the correct form for indexing. But at other times, you might need the full power of the Spark API to transform data into an indexable form. The Another powerful use of the Your Scala script can do other things but, at a minimum, it must define the following function that the Parallel Bulk Loader invokes:
|
|
The |
|
If you have a Spark ML PipelineModel loaded into the blob store, you can supply the blob ID to the Parallel Bulk Loader and it will:
This lets you use Spark ML models to make predictions in a more scalable, performant manner than what can be achieved with a Machine Learning index stage. |
Output settings
Setting | Description |
---|---|
|
Name of the Fusion collection to write to. The Parallel Bulk Loader uses the Collections API to resolve the underlying Solr collection at runtime. |
|
Name of a Fusion index pipeline to which to send documents, instead of directly indexing to Solr. This option lets you perform additional ETL (extract, transform, and load) processing on the documents before they are indexed in Solr. If you need to write to time-partitioned indexes, then you must use an index pipeline, because writing directly to Solr is not partition aware. |
|
Flag to indicate if the Parallel Bulk Loader should use the input schema to create fields in Solr, after applying the Scala and/or SQL transformations. If |
|
If checked, the Parallel Bulk Loader deletes any existing documents in the output collection that match the query |
|
Flag to send documents directly to Solr as atomic updates instead of as new documents. This option is not supported when using an index profile. Also note that the Parallel Bulk Loader tracking fields |
|
Options used when writing directly to Solr. See Spark-Solr: https://github.com/lucidworks/spark-solr#index-parameters For example, if your docs are relatively small, you might want to increase the
|
|
Coalesce the DataFrame into N partitions before writing to Solr. This can help spread the indexing work out across more executors that are available in Spark, or limit the parallelism when writing to Solr. |
Tune performance
As the name of the Parallel Bulk Loader job implies, it is designed to ingest large amounts of data into Fusion by parallelizing the work across your Spark cluster. To achieve scalability, you might need to increase the amount of memory and/or CPU resources allocated to the job.
By default, Fusion’s Spark configuration settings control the resources allocated to Parallel Bulk Loader jobs.
You can pass these properties in the job configuration to override the default Spark shell options:
Parameter Name | Description and Default |
---|---|
|
Cores for the driver Default: |
|
Memory for the driver (for example, Default: |
|
Cores per executor Default: 1 in YARN mode, or all available cores on the worker in standalone mode |
|
Memory per executor (for example, Default: |
|
Total cores for all executors Default: Without setting this parameter, the total cores for all executors is the number of executors in YARN mode, or all available cores on all workers in standalone mode. |
Examples
Here we provide screenshots and example JSON job definitions to illustrate key points about how to load from different data sources.
Use NLP during indexing
In this example, we leverage the John Snow labs NLP library during indexing. This is just quick-and-dirty to show the concept.
Also see:
Use this transform Scala script:
import com.johnsnowlabs.nlp._
import com.johnsnowlabs.nlp.annotators._
import org.apache.spark.ml.Pipeline
import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetector
def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
val documentAssembler = new DocumentAssembler().setInputCol("plot_txt_en").setOutputCol("document")
val sentenceDetector = new SentenceDetector().setInputCols(Array("document")).setOutputCol("sentences")
val finisher = new Finisher()
.setInputCols("sentences")
.setOutputCols("sentences_ss")
.setOutputAsArray(true)
.setCleanAnnotations(false)
val pipeline = new Pipeline().setStages(Array(documentAssembler,sentenceDetector,finisher))
pipeline.fit(inputDF).transform(inputDF).drop("document").drop("sentences")
}
Be sure to add the JohnSnowLabs:spark-nlp:1.4.2
package using Spark Shell Options.
Clean up data with SQL transformations
Fusion has a Local Filesystem connector that can handle files such as CSV and JSON files. Using the Parallel Bulk Loader lets you leverage features that are not in the Local Filesystem connector, such as using SQL to clean up the input data.
Use the following SQL to clean up the input data before indexing:
SELECT _c0 as user_id,
CAST(_c1 as INT) as age,
_c2 as gender,
_c3 as occupation,
_c4 as zip_code
FROM _input
Job JSON:
{
"type" : "parallel-bulk-loader",
"id" : "csv",
"format" : "csv",
"path" : "/Users/tjp/dev/lw/projects/fusion-spark-bootcamp/labs/movielens/ml-100k/u.user",
"readOptions" : [ {
"key" : "delimiter",
"value" : "|"
}, {
"key" : "header",
"value" : "false"
} ],
"outputCollection" : "users",
"clearDatasource" : false,
"defineFieldsUsingInputSchema" : true,
"atomicUpdates" : false,
"transformSql" : "SELECT _c0 as user_id, \n CAST(_c1 as INT) as age, \n _c2 as gender,\n _c3 as occupation,\n _c4 as zip_code \n FROM _input"
}
Read from S3
It is easy to read from an S3 bucket without pulling data down to your local workstation first. To avoid exposing your AWS credentials, add them to a file named core-site.xml
in the apps/spark-dist/conf
directory, such as:
<configuration>
<property>
<name>fs.s3a.access.key</name>
<value>???</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>???</value>
</property>
</configuration>
Then you can load files using the S3a protocol, such as: s3a://sstk-dev/data/u.user
. If you are running a Fusion cluster then each instance of Fusion will need a core-site.xml
file. S3a is the preferred protocol for reading data into Spark because it uses Amazon’s libraries to read from S3 instead of the legacy Hadoop libraries. If you need other S3 protocols (for example, s3 or s3n) you will need to add the equivalent properties to core-site.xml
.
You will need to add the org.apache.hadoop:hadoop-aws:2.7.3
package to the job using the --packages
Spark option. Also, you will need to exclude the com.fasterxml.jackson.core:jackson-core,joda-time:joda-time
packages using the --exclude-packages
option.
You can also read from Google Cloud Storage (GCS), but you will need a few more properties in your core-site.xml
; see Installing the Cloud Storage connector.
Read from Parquet
Reading from parquet files is built into Spark using the "parquet" format. For additional read options, see Configuration of Parquet.
Job JSON:
{
"type" : "parallel-bulk-loader",
"id" : "ecomm demo parquet signals",
"format" : "parquet",
"path" : "./part-00000-c1951958-98ae-4f2a-b7b4-2e3a69fcf403-c000.snappy.parquet",
"outputCollection" : "best-buy_signals",
"clearDatasource" : false,
"defineFieldsUsingInputSchema" : true,
"atomicUpdates" : false
}
This example also uses the transformScala
option to filter and transform the input DataFrame into a better form for indexing using the following Scala script:
import java.util.Calendar
import java.util.Locale
import java.util.TimeZone
def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
// do transformations and/or filter the inputDF here
val signalsDF =
inputDF.filter((unix_timestamp($"timestamp_tdt", "MM/dd/yyyy HH:mm:ss.SSS") < 1325376000))
val now = System.currentTimeMillis()
val maxDate = signalsDF.agg(max("timestamp_tdt")).take(1)(0).getAs[java.sql.Timestamp](0).getTime
val diff = now - maxDate
val add_time =
udf((t: java.sql.Timestamp, diff : Long) => new java.sql.Timestamp(t.getTime + diff))
val day_of_week = udf((t: java.sql.Timestamp) => {
val calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
calendar.setTimeInMillis(t.getTime)
calendar.getDisplayName(Calendar.DAY_OF_WEEK, Calendar.LONG, Locale.getDefault)
})
//Remap some columns to bring the timestamps current
signalsDF
.withColumnRenamed("timestamp_tdt", "orig_timestamp_tdt").withColumn("timestamp_tdt", add_time($"orig_timestamp_tdt", lit(diff)))
.withColumn("date", $"timestamp_tdt")
.withColumn("tx_timestamp_txt", date_format($"timestamp_tdt", "E YYYY-MM-d HH:mm:ss.SSS Z"))
.withColumn("param.query_time_dt", $"timestamp_tdt")
.withColumn("date_day", date_format(date_sub($"date", 0), "YYYY-MM-d'T'HH:mm:ss.SSS'Z'"))
.withColumn("date_month", date_format(trunc($"date", "mm"), "YYYY-MM-d'T'HH:mm:ss.SSS'Z'"))
.withColumn("date_year", date_format(trunc($"date", "yyyy"), "YYYY-MM-d'T'HH:mm:ss.SSS'Z'"))
.withColumn("day_of_week", day_of_week($"date"))
}
Read from JDBC tables
You can use the Parallel Bulk Loader to parallelize reads from JDBC tables, if the tables have numeric columns that can be partitioned into relatively equal partition sizes. In the example below, we partition the employees table into 4 partitions using the emp_no
column (int
). Behind the scenes, Spark sends four separate queries to the database and processes the result sets in parallel.
Load the JDBC driver JAR file into the Blob store
Before you ingest from a JDBC data source, you need to use the Fusion Admin UI to upload the JDBC driver JAR file into the blob store.
Alternatively, you can add the JAR file to the Fusion blob store with resourceType=spark:jar
; for example:
curl -XPUT -H "Content-type:application/octet-stream" "http://localhost:8765/api/v1/blobs/mysql_jdbc_jar?resourceType=spark:jar" --data-binary @mysql-connector-java-5.1.45-bin.jar
At runtime, Fusion’s Spark job management framework knows how to add any JAR files with resourceType=spark:jar
from the blob store to the appropriate classpaths before running a Parallel Bulk Loader job.
Read from a table
For more information on reading from JDBC-compliant databases, see:
{
"type" : "parallel-bulk-loader",
"id" : "load dbtable",
"format" : "jdbc",
"readOptions" : [ {
"key" : "url",
"value" : "jdbc:mysql://localhost/employees?user=?&password=?"
}, {
"key" : "dbtable",
"value" : "employees"
}, {
"key" : "partitionColumn",
"value" : "emp_no"
}, {
"key" : "numPartitions",
"value" : "4"
}, {
"key" : "driver",
"value" : "com.mysql.jdbc.Driver"
}, {
"key" : "lowerBound",
"value" : "$MIN(emp_no)"
}, {
"key" : "upperBound",
"value" : "$MAX(emp_no)"
} ],
"outputCollection" : "employees",
"clearDatasource" : false,
"defineFieldsUsingInputSchema" : true,
"atomicUpdates" : false
}
Notice the use of the $MIN(emp_no)
and $MAX(emp_no)
macros in the read options. These are macros offered by the Parallel Bulk Loader to help configure parallel reads of JDBC tables. Behind the scenes, the macros are translated into SQL queries to get the MAX and MIN values of the specified field, which Spark uses to compute splits for partitioned queries. As mentioned above, the field must be numeric and must have a relatively balanced distribution of values between MAX and MIN; otherwise, you are unlikely to see much performance benefit to partitioning.
Index HBase tables
To index an HBase table, use the Hortonworks connector.
The Parallel Bulk Loader lets us replace the HBase Indexer. |
You will need to add an hbase-site.xml
(and possibly core-site.xml
) to apps/spark-dist/conf
in Fusion, for example:
<configuration>
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost:2181</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
</configuration>
For this example, we will create a test table in HBase. If you already have a table in HBase, feel free to use that table instead.
-
Launch the HBase shell and create a table named
fusion_nums
with a single column family namedlw
:create 'fusion_nums', 'lw'
-
Do a list command to see your table:
hbase(main):002:0> list TABLE fusion_nums 1 row(s) in 0.0250 seconds => ["fusion_nums"]
-
Fill the table with some data:
for i in '1'..'100' do for j in '1'..'2' do put 'fusion_nums', "row#{i}", "lw:c#{j}", "#{i}#{j}" end end
-
Scan the fusion_nums table to see your data:
scan 'fusion_nums'
The HBase connector requires a catalog read option that defines the columns you want to read and how to map them into a Spark DataFrame. For our sample table, the following suffices:
{
"table":{"namespace":"default", "name":"fusion_nums"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"string"},
"lw_c1_s":{"cf":"lw", "col":"c1", "type":"string"},
"lw_c2_s":{"cf":"lw", "col":"c2", "type":"string"}
}
}
Notice the use of the $lastTimestamp
macro in the read options. This lets us filter rows read from HBase using the timestamp of the last document the Parallel Bulk Loader wrote to Solr, that is, to get the newest updates from HBase only (incremental updates). Most Spark data sources provide a way to filter results based on timestamp.
Job JSON:
{
"type" : "parallel-bulk-loader",
"id" : "hbase",
"format" : "org.apache.spark.sql.execution.datasources.hbase",
"readOptions" : [ {
"key" : "catalog",
"value" : "{ \"table\":{\"namespace\":\"default\", \"name\":\"fusion_nums\"}, \"rowkey\":\"key\", \"columns\":{ \"id\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}, \"lw_c1_s\":{\"cf\":\"lw\", \"col\":\"c1\", \"type\":\"string\"}, \"lw_c2_s\":{\"cf\":\"lw\", \"col\":\"c2\", \"type\":\"string\"} } }"
}, {
"key" : "minStamp",
"value" : "$lastTimestamp(EPOCH_MS)"
} ],
"outputCollection" : "hbase",
"timestampFieldName" : "timestamp_tdt",
"clearDatasource" : false,
"defineFieldsUsingInputSchema" : true,
"atomicUpdates" : false,
"shellOptions" : [ {
"key" : "--packages",
"value" : "com.hortonworks:shc-core:1.1.1-2.1-s_2.11"
}, {
"key" : "--repositories",
"value" : "http://repo.hortonworks.com/content/repositories/releases"
} ]
}
Index Elastic data
With Elasticsearch 6.2.2 using the org.elasticsearch:elasticsearch-spark-20_2.11:6.2.1
package, here is a Scala script to run in bin/spark-shell
to index some test data:
import spark.implicits._
case class SimpsonCharacter(name: String, actor: String, episodeDebut: String)
val simpsonsDF = sc.parallelize(
SimpsonCharacter("Homer", "Dan Castellaneta", "Good Night") ::
SimpsonCharacter("Marge", "Julie Kavner", "Good Night") ::
SimpsonCharacter("Bart", "Nancy Cartwright", "Good Night") ::
SimpsonCharacter("Lisa", "Yeardley Smith", "Good Night") ::
SimpsonCharacter("Maggie", "Liz Georges and more", "Good Night") ::
SimpsonCharacter("Sideshow Bob", "Kelsey Grammer", "The Telltale Head") :: Nil).toDF()
val writeOpts = Map("es.nodes" -> "127.0.0.1",
"es.port" -> "9200",
"es.index.auto.create" -> "true",
"es.resouce.auto.create" -> "shows/simpsons")
simpsonsDF.write.format("org.elasticsearch.spark.sql").mode("Overwrite").save("shows/simpsons")
Job JSON:
{
"type" : "parallel-bulk-loader",
"id" : "elastic",
"format" : "org.elasticsearch.spark.sql",
"readOptions" : [ {
"key" : "es.nodes",
"value" : "127.0.0.1"
}, {
"key" : "es.port",
"value" : "9200"
}, {
"key" : "es.resource",
"value" : "shows/simpsons"
} ],
"outputCollection" : "hbase_signals_aggr",
"clearDatasource" : false,
"defineFieldsUsingInputSchema" : true,
"atomicUpdates" : false,
"shellOptions" : [ {
"key" : "--packages",
"value" : "org.elasticsearch:elasticsearch-spark-20_2.11:6.2.2"
} ]
}
Read from Couchbase
To index a Couchbase bucket, use the official Couchbase Spark connector found here.
For example, we will create a test bucket in Couchbase. If you already have a bucket in Couchbase, feel free to use that and skip to the test data setup section. This test was performed using Couchbase Server 6.0.0.
-
Create a bucket test in the Couchbase admin UI. Give access to a system account user to use in the Parallel Bulk Loader job config.
-
Connect to Couchbase using the command line client cbq. For example,
cbq -e=http://<host>:8091 -u <user> -p <password>
. Ensure the provided user is an authorized user of the test bucket. -
Create a primary index on the test bucket:
CREATE PRIMARY INDEX 'test-primary-index' ON 'test' USING GSI;
. -
Insert some data:
INSERT INTO 'test' ( KEY, VALUE ) VALUES ( "1", { "id": "01", "field1": "a value", "field2": "another value"} ) RETURNING META().id as docid, *;
. -
Verify you can query the document just inserted:
select * from 'test';
.
To ingest from this bucket with the Parallel Bulk Loader, use the Couchbase Spark connector by specifying the format com.couchbase.spark.sql.DefaultSource
. Then specify the com.couchbase.client:spark-connector_2.11:2.2.0
package as the spark shell --packages
option, as well as a few spark settings that direct the connector to a particular Couchbase server and bucket to connect to using the provided credentials. See here for all of the available Spark configuration settings for the Couchbase Spark connector.
Putting it all together:
XML setup
XML is a supported format that requires settings for format
and --packages
. In addition, you must specify the filepath in the readOptions
section. For example:
{ "type": "parallel-bulk-loader", "id": "pbl", "format": "com.databricks.spark.xml", "path": "this-is-ignored", "readOptions": [ { "key": "rowTag", "value": "tag_name_representing_record" }, { "key": "path", "value": "/home/user/file.xml.gz" } ], "outputCollection": "output_collection_name", "clearDatasource": true, "defineFieldsUsingInputSchema": true, "atomicUpdates": false, "transformScala": "", "shellOptions": [ { "key": "--packages", "value": "com.databricks:spark-xml_2.11:0.5.0" } ], "cacheAfterRead": false }