Driver Program¶
This section describes the usage of Geo Addressing SDK Classes and APIs through a sample Python Driver Class:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | # Importing the Required Classes
from addressing.DownloadManagerBuilder import DownloadManagerBuilder
from addressing.AddressingBuilder import AddressingBuilder
from addressing.HDFSDownloader import HDFSDownloader
from addressing.S3Downloader import S3Downloader
from addressing.LocalFilePassthroughDownloader import LocalFilePassthroughDownloader
from addressing.PreferencesBuilder import PreferencesBuilder
from addressing.HadoopConfiguration import HadoopConfiguration
from addressing.UDFExecutor import UDFExecutor
# The Main Method
if __name__ == "__main__":
# Create Spark Session
spark = SparkSession.builder().appName("AddressingExample").getOrCreate()
# Create object of Download Manager
downloadManager = DownloadManagerBuilder("/addressing/downloads") \
.addDownloader(S3Downloader(HadoopConfiguration().getHadoopConfiguration()).getDownloader()) \
.addDownloader(HDFSDownloader(HadoopConfiguration().getHadoopConfiguration()).getDownloader()) \
.addDownloader(LocalFilePassthroughDownloader().getDownloader()) \
.build()
# Read the input data
input = spark.read.option("header", true).csv(path = "hdfs:///addressing/input/")
# Configure the AddressingBuilder.
addressingBuilder = AddressingBuilder() \
.withDownloadManager(downloadManager) \
.withResourcesLocation("hdfs:///addressing/resources/") \
.withDataLocations("hdfs:///addressing/reference_data/")
# Configure the UDFBuilder
udfBuilder = addressingBuilder.udfBuilder() \
.withPreferences(PreferencesBuilder().withReturnAllInfo(True).build()) \
.withOutputFields("address.formattedStreetAddress as formattedStreetAddress",
"address.formattedLocationAddress as formattedLocationAddress",
"location.feature.geometry.coordinates.x as x",
"location.feature.geometry.coordinates.y as y",
"customFields['PB_KEY'] as 'PB_KEY'") \
.withErrorField("error")
# Create UDF for Geocode Operation
geocodeUDF = udfBuilder.forGeocode()
# Execute the UDF
output = inputTable.withColumn("result", \
UDFExecutor().apply(geocodeUDF, create_map( \
lit("addressLines[0]"), col("streetAddress"), \
lit("addressLines[1]"), col("locationAddress"), \
lit("country"), col("country"))) \
).persist()
.select("*", "result.*").drop(colName = "result")
# Save the output result in the HDFS
output.write.mode("overwrite").option("header", true).csv("hdfs:///addressing/output")
|