Package

com.precisely.bigdata.addressing

spark

Permalink

package spark

Visibility
  1. Public
  2. All

Value Members

  1. package api

    Permalink

    Provides classes to simplify usage of the java Addressing API.

    Provides classes to simplify usage of the java Addressing API. Example usage of the udf in spark:

    object GeocodeExample {
      def main(args: Array[String]): Unit = {
        val session = SparkSession.builder().appName(this.getClass.getName).getOrCreate()
    
        //build a download manager capable of downloading remote resources to a node local path
        val downloadManager = new DownloadManagerBuilder("/addressing/downloads")
          .addDownloader(new S3Downloader(session.sparkContext.hadoopConfiguration))
          .addDownloader(new HDFSDownloader(session.sparkContext.hadoopConfiguration))
          .addDownloader(new LocalFilePassthroughDownloader())
          .build()
    
        //read the input records
        val input = session.read.option("header", true).csv(path = "hdfs:///addressing/input/")
    
        //build the udf
        val geocodeUdf: UserDefinedFunction = new AddressingBuilder()
          .withDownloadManager(downloadManager)
          .withResourcesLocation("hdfs:///addressing/resources/")
          .withDataLocations("hdfs:///addressing/reference_data/")
         .udfBuilder()
          //customize the preference to return extra information like pb_key
          .withPreferences(new PreferencesBuilder().withReturnAllInfo(true).build())
          .withOutputFields(
            "address.formattedStreetAddress as formattedStreetAddress",
            "address.formattedLocationAddress as formattedLocationAddress",
            "location.feature.geometry.coordinates.x as x",
            "location.feature.geometry.coordinates.y as y",
            "customFields['PB_KEY'] as 'PB_KEY'"
          )
          .withErrorField("error")
          .forGeocode()
    
        val output = input.withColumn("result", geocodeUdf(map(
          lit("addressLines[0]"), col("streetAddress"),
          lit("addressLines[1]"), col("locationAddress")
        )))
          //persist so that we don't run the udf for each field in the subsequent unrolling of the result field
          .persist()
          .select("*", "result.*").drop(colName = "result")
    
        output.write.mode(SaveMode.Overwrite).option("header", true).csv("hdfs:///addressing/output")
      }
    }

Ungrouped