Provides classes to simplify usage of the java Addressing API.
Provides classes to simplify usage of the java Addressing API. Example usage of the udf in spark:
object GeocodeExample {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder().appName(this.getClass.getName).getOrCreate()
//build a download manager capable of downloading remote resources to a node local pathval downloadManager = new DownloadManagerBuilder("/addressing/downloads")
.addDownloader(new S3Downloader(session.sparkContext.hadoopConfiguration))
.addDownloader(new HDFSDownloader(session.sparkContext.hadoopConfiguration))
.addDownloader(new LocalFilePassthroughDownloader())
.build()
//read the input recordsval input = session.read.option("header", true).csv(path = "hdfs:///addressing/input/")
//build the udfval geocodeUdf: UserDefinedFunction = new AddressingBuilder()
.withDownloadManager(downloadManager)
.withResourcesLocation("hdfs:///addressing/resources/")
.withDataLocations("hdfs:///addressing/reference_data/")
.udfBuilder()
//customize the preference to return extra information like pb_key
.withPreferences(new PreferencesBuilder().withReturnAllInfo(true).build())
.withOutputFields(
"address.formattedStreetAddress as formattedStreetAddress",
"address.formattedLocationAddress as formattedLocationAddress",
"location.feature.geometry.coordinates.x as x",
"location.feature.geometry.coordinates.y as y",
"customFields['PB_KEY'] as 'PB_KEY'"
)
.withErrorField("error")
.forGeocode()
val output = input.withColumn("result", geocodeUdf(map(
lit("addressLines[0]"), col("streetAddress"),
lit("addressLines[1]"), col("locationAddress")
)))
//persist so that we don't run the udf for each field in the subsequent unrolling of the result field
.persist()
.select("*", "result.*").drop(colName = "result")
output.write.mode(SaveMode.Overwrite).option("header", true).csv("hdfs:///addressing/output")
}
}
Provides classes to simplify usage of the java Addressing API. Example usage of the udf in spark: