Driver Program

This section describes the usage of Geo Addressing SDK Classes and APIs through a sample Python Driver Class:

AddressingExample.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# Importing the Required Classes
from addressing.DownloadManagerBuilder import DownloadManagerBuilder
from addressing.AddressingBuilder import AddressingBuilder
from addressing.HDFSDownloader import HDFSDownloader
from addressing.S3Downloader import S3Downloader
from addressing.LocalFilePassthroughDownloader import LocalFilePassthroughDownloader
from addressing.PreferencesBuilder import PreferencesBuilder
from addressing.HadoopConfiguration import HadoopConfiguration
from addressing.UDFExecutor import UDFExecutor

# The Main Method
if __name__ == "__main__":

    # Create Spark Session
    spark = SparkSession.builder().appName("AddressingExample").getOrCreate()

    # Create object of Download Manager
    downloadManager = DownloadManagerBuilder("/addressing/downloads") \
        .addDownloader(S3Downloader(HadoopConfiguration().getHadoopConfiguration()).getDownloader()) \
        .addDownloader(HDFSDownloader(HadoopConfiguration().getHadoopConfiguration()).getDownloader()) \
        .addDownloader(LocalFilePassthroughDownloader().getDownloader()) \
        .build()

    # Read the input data
    input = spark.read.option("header", true).csv(path = "hdfs:///addressing/input/")

    # Configure the AddressingBuilder.
    addressingBuilder = AddressingBuilder() \
        .withDownloadManager(downloadManager) \
        .withResourcesLocation("hdfs:///addressing/resources/") \
        .withDataLocations("hdfs:///addressing/reference_data/")

    # Configure the UDFBuilder
    udfBuilder = addressingBuilder.udfBuilder() \
        .withPreferences(PreferencesBuilder().withReturnAllInfo(True).build()) \
        .withOutputFields("address.formattedStreetAddress as formattedStreetAddress",
                                                        "address.formattedLocationAddress as formattedLocationAddress",
                                                        "location.feature.geometry.coordinates.x as x",
                                                        "location.feature.geometry.coordinates.y as y",
                                                        "customFields['PB_KEY'] as 'PB_KEY'") \
        .withErrorField("error")

    # Create UDF for Geocode Operation
    geocodeUDF = udfBuilder.forGeocode()

    # Execute the UDF
    output = inputTable.withColumn("result", \
                                UDFExecutor().apply(geocodeUDF, create_map( \
                                    lit("addressLines[0]"), col("streetAddress"), \
                                    lit("addressLines[1]"), col("locationAddress"), \
                                    lit("country"), col("country"))) \
                                ).persist()
                                .select("*", "result.*").drop(colName = "result")

    # Save the output result in the HDFS
    output.write.mode("overwrite").option("header", true).csv("hdfs:///addressing/output")