-
Notifications
You must be signed in to change notification settings - Fork 6.7k
feat(pubsublite): attributes field in sink example #7405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ def spark_streaming_to_pubsublite( | |
| ) -> None: | ||
| # [START pubsublite_spark_streaming_to_pubsublite] | ||
| from pyspark.sql import SparkSession | ||
| from pyspark.sql.functions import array, create_map, col, lit, when | ||
| from pyspark.sql.types import BinaryType, StringType | ||
| import uuid | ||
|
|
||
|
|
@@ -35,13 +36,32 @@ def spark_streaming_to_pubsublite( | |
| # |-- value: long (nullable = true) | ||
| sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load() | ||
|
|
||
| # Transform the dataframe to match the required data fields and data types: | ||
| # https://github.com/googleapis/java-pubsublite-spark#data-schema | ||
| sdf = ( | ||
| sdf.withColumn("key", (sdf.value % 5).cast(StringType()).cast(BinaryType())) | ||
| .withColumn("event_timestamp", sdf.timestamp) | ||
| .withColumn("data", sdf.value.cast(StringType()).cast(BinaryType())) | ||
| .drop("value", "timestamp") | ||
| sdf.withColumn("key", lit("example").cast(BinaryType())) | ||
| .withColumn("data", col("value").cast(StringType()).cast(BinaryType())) | ||
| .withColumnRenamed("timestamp", "event_timestamp") | ||
| # Populate the attributes field. For example, an even value will | ||
| # have {"key1", [b"even"]}. | ||
| .withColumn( | ||
| "attributes", | ||
| create_map( | ||
| lit("key1"), | ||
| array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")), | ||
| ), | ||
| ) | ||
| .drop("value") | ||
| ) | ||
|
|
||
| # After the transformation, the schema of the dataframe should look like: | ||
| # |-- key: binary (nullable = false) | ||
| # |-- data: binary (nullable = true) | ||
| # |-- event_timestamp: timestamp (nullable = true) | ||
| # |-- attributes: map (nullable = false) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. our guide and everywhere actually specifies attributes is nullable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but our code doesn't really check since they are compatible ie our codebase is ok with nullable, I think it's fine.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jiangmichaellll Yes, all the fields can be nulls. It's just we are printing out of the dataframe created by the code above. |
||
| # | |-- key: string | ||
| # | |-- value: array (valueContainsNull = false) | ||
| # | | |-- element: binary (containsNull = false) | ||
| sdf.printSchema() | ||
|
|
||
| query = ( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.