diff --git a/pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py b/pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py index 3ab89ffe4ef..4c37b8235c8 100644 --- a/pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py +++ b/pubsublite/spark-connector/spark_streaming_to_pubsublite_example.py @@ -20,6 +20,7 @@ def spark_streaming_to_pubsublite( ) -> None: # [START pubsublite_spark_streaming_to_pubsublite] from pyspark.sql import SparkSession + from pyspark.sql.functions import array, create_map, col, lit, when from pyspark.sql.types import BinaryType, StringType import uuid @@ -35,13 +36,32 @@ def spark_streaming_to_pubsublite( # |-- value: long (nullable = true) sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load() + # Transform the dataframe to match the required data fields and data types: + # https://github.com/googleapis/java-pubsublite-spark#data-schema sdf = ( - sdf.withColumn("key", (sdf.value % 5).cast(StringType()).cast(BinaryType())) - .withColumn("event_timestamp", sdf.timestamp) - .withColumn("data", sdf.value.cast(StringType()).cast(BinaryType())) - .drop("value", "timestamp") + sdf.withColumn("key", lit("example").cast(BinaryType())) + .withColumn("data", col("value").cast(StringType()).cast(BinaryType())) + .withColumnRenamed("timestamp", "event_timestamp") + # Populate the attributes field. For example, an even value will + # have {"key1", [b"even"]}. + .withColumn( + "attributes", + create_map( + lit("key1"), + array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")), + ), + ) + .drop("value") ) + # After the transformation, the schema of the dataframe should look like: + # |-- key: binary (nullable = false) + # |-- data: binary (nullable = true) + # |-- event_timestamp: timestamp (nullable = true) + # |-- attributes: map (nullable = false) + # | |-- key: string + # | |-- value: array (valueContainsNull = false) + # | | |-- element: binary (containsNull = false) sdf.printSchema() query = (