diff --git a/docs/BigQueryTable-batchsource.md b/docs/BigQueryTable-batchsource.md index 7eec12e74..b36fd3786 100644 --- a/docs/BigQueryTable-batchsource.md +++ b/docs/BigQueryTable-batchsource.md @@ -57,6 +57,14 @@ name is not null', all output rows will have an 'age' over 50 and a value for th This is the same as the WHERE clause in BigQuery. More information can be found at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#where_clause +**Order By**: The column or list of columns to order the data by. For +example, `name asc, age desc`. More information can be found +at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#order_by_clause. + +**Limit**: The maximum number of rows to read from the source table. More information can be +found +at https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#limit_and_offset_clause. + **Enable Querying Views**: Whether to allow querying views. Since BigQuery views are not materialized by default, querying them may have a performance overhead. diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index 0b0a9efb1..b7da7d4bd 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -267,6 +267,12 @@ private void configureBigQuerySource() { if (config.getViewMaterializationDataset() != null) { configuration.set(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET, config.getViewMaterializationDataset()); } + if (config.getOrderBy() != null) { + configuration.set(BigQueryConstants.CONFIG_ORDER_BY, config.getOrderBy()); + } + if (config.getLimit() != null) { + configuration.set(BigQueryConstants.CONFIG_LIMIT, String.valueOf(config.getLimit())); + } configuration.set(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT, String.valueOf(config.getReadTimeout())); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java index 36bb40a8d..8b092438a 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySourceConfig.java @@ -58,6 +58,7 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig { private static final String VALID_DATE_FORMAT = "yyyy-MM-dd"; private static final String SCHEME = "gs://"; private static final String WHERE = "WHERE"; + private static final String ORDER_BY = "ORDER BY"; private static final String NAME_READ_TIMEOUT = "readTimeout"; public static final Set SUPPORTED_TYPES = ImmutableSet.of(Schema.Type.LONG, Schema.Type.STRING, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES, @@ -71,6 +72,8 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig { public static final String NAME_ENABLE_QUERYING_VIEWS = "enableQueryingViews"; public static final String NAME_VIEW_MATERIALIZATION_PROJECT = "viewMaterializationProject"; public static final String NAME_VIEW_MATERIALIZATION_DATASET = "viewMaterializationDataset"; + public static final String NAME_LIMIT = "limit"; + public static final String NAME_ORDER_BY = "orderBy"; @Name(Constants.Reference.REFERENCE_NAME) @Nullable @@ -111,6 +114,19 @@ public final class BigQuerySourceConfig extends BigQueryBaseConfig { "and discards all rows that do not return TRUE (that is, rows that return FALSE or NULL).") private String filter; + @Name(NAME_ORDER_BY) + @Macro + @Nullable + @Description("The ORDER BY clause sorts the results of a query based on one or more columns. " + + "For example, 'name asc, age desc'.") + private String orderBy; + + @Name(NAME_LIMIT) + @Macro + @Nullable + @Description("The LIMIT clause restricts the number of rows returned by the query.") + private Long limit; + @Name(NAME_ENABLE_QUERYING_VIEWS) @Macro @Nullable @@ -187,6 +203,13 @@ public void validate(FailureCollector collector, Map arguments) if (!containsMacro(NAME_CMEK_KEY)) { validateCmekKey(collector, arguments); } + + if (!containsMacro(NAME_LIMIT) && limit != null) { + if (limit < 0) { + collector.addFailure("Invalid limit value.", "Limit must be a non-negative number.") + .withConfigProperty(NAME_LIMIT); + } + } } void validateCmekKey(FailureCollector collector, Map arguments) { @@ -282,17 +305,44 @@ public String getPartitionTo() { @Nullable public String getFilter() { - if (filter != null) { - filter = filter.trim(); - if (filter.isEmpty()) { - return null; - } - // remove the WHERE keyword from the filter if the user adds it at the begging of the expression - if (filter.toUpperCase().startsWith(WHERE)) { - filter = filter.substring(WHERE.length()); - } + return cleanupSqlFragment(filter, WHERE); + } + + @Nullable + public String getOrderBy() { + return cleanupSqlFragment(orderBy, ORDER_BY); + } + + @Nullable + public Long getLimit() { + return limit; + } + + /** + * Cleans up a SQL fragment by trimming whitespace and stripping a given keyword from the + * beginning of the string in a case-insensitive way. + * + * @param fragment The input SQL string fragment. + * @param keyword The SQL keyword to remove (e.g., "WHERE ", "ORDER BY "). + * @return The cleaned fragment, or null if the input was null or empty. + */ + @Nullable + private String cleanupSqlFragment(@Nullable String fragment, String keyword) { + if (Strings.isNullOrEmpty(fragment)) { + return null; + } + + fragment = fragment.trim(); + + if (fragment.isEmpty()) { + return null; } - return filter; + + if (fragment.toUpperCase().startsWith(keyword)) { + fragment = fragment.substring(keyword.length()).trim(); + } + + return fragment.isEmpty() ? null : fragment; } public boolean isEnableQueryingViews() { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java index b6d455c28..48569eaf1 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java @@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.bigquery.RangePartitioning; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableDefinition.Type; import com.google.cloud.bigquery.TimePartitioning; @@ -52,6 +53,8 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.GeneralSecurityException; @@ -68,6 +71,7 @@ */ public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat { + private static final Logger LOG = LoggerFactory.getLogger(PartitionedBigQueryInputFormat.class); private InputFormat delegateInputFormat = new AvroBigQueryInputFormatWithScopes(); @@ -132,19 +136,27 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null); String partitionToDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_TO_DATE, null); String filter = configuration.get(BigQueryConstants.CONFIG_FILTER, null); + String limit = configuration.get(BigQueryConstants.CONFIG_LIMIT, null); + String orderBy = configuration.get(BigQueryConstants.CONFIG_ORDER_BY, null); Integer readTimeout = configuration.getInt(BigQueryConstants.CONFIG_BQ_HTTP_READ_TIMEOUT, GCPUtils.BQ_DEFAULT_READ_TIMEOUT_SECONDS); com.google.cloud.bigquery.Table bigQueryTable = BigQueryUtil.getBigQueryTable( - datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null, readTimeout); + datasetProjectId, datasetId, tableName, serviceAccount, isServiceAccountFilePath, null, + readTimeout); Type type = Objects.requireNonNull(bigQueryTable).getDefinition().getType(); + Boolean isPartitionFilterRequired = bigQueryTable.getRequirePartitionFilter(); + StandardTableDefinition tableDefinition = Objects.requireNonNull(bigQueryTable).getDefinition(); String query; if (type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL) { - query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter); + query = generateQueryForMaterializingView(datasetProjectId, datasetId, tableName, filter, + limit, orderBy); } else { - query = generateQuery(partitionFromDate, partitionToDate, filter, projectId, datasetProjectId, datasetId, - tableName, serviceAccount, isServiceAccountFilePath); + query = generateQuery(partitionFromDate, partitionToDate, filter, datasetProjectId, + datasetId, + tableName, limit, orderBy, + isPartitionFilterRequired, tableDefinition); } if (query != null) { @@ -166,30 +178,35 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc } @VisibleForTesting - String generateQuery(String partitionFromDate, String partitionToDate, String filter, String project, - String datasetProject, String dataset, String table, @Nullable String serviceAccount, - @Nullable Boolean isServiceAccountFilePath) { - if (partitionFromDate == null && partitionToDate == null && filter == null) { - return null; - } - String queryTemplate = "select * from `%s` where %s"; - com.google.cloud.bigquery.Table sourceTable = - BigQueryUtil.getBigQueryTable(datasetProject, dataset, table, serviceAccount, isServiceAccountFilePath, null, - null); - StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition(); + String generateQuery(String partitionFromDate, String partitionToDate, String filter, + String datasetProject, String dataset, String table, String limit, String orderBy, + Boolean isPartitionFilterRequired, StandardTableDefinition tableDefinition) { + + RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning(); TimePartitioning timePartitioning = tableDefinition.getTimePartitioning(); - if (timePartitioning == null && filter == null) { - return null; - } StringBuilder condition = new StringBuilder(); + String partitionCondition = null; if (timePartitioning != null) { - String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate, - partitionToDate); - condition.append(timePartitionCondition); + if (partitionFromDate == null && partitionToDate == null + && Objects.equals(isPartitionFilterRequired, Boolean.TRUE)) { + partitionCondition = BigQueryUtil.generateDefaultTimePartitionCondition(tableDefinition); + } else if (partitionFromDate != null || partitionToDate != null) { + partitionCondition = + BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate, + partitionToDate); + } + } else if (rangePartitioning != null && Objects.equals(isPartitionFilterRequired, + Boolean.TRUE)) { + partitionCondition = BigQueryUtil.generateDefaultRangePartitionCondition( + tableDefinition); + } + + if (!Strings.isNullOrEmpty(partitionCondition)) { + condition.append("(").append(partitionCondition).append(")"); } - if (filter != null) { + if (!Strings.isNullOrEmpty(filter)) { if (condition.length() == 0) { condition.append(filter); } else { @@ -198,20 +215,42 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi } String tableName = datasetProject + "." + dataset + "." + table; - return String.format(queryTemplate, tableName, condition.toString()); + StringBuilder query = new StringBuilder("select * from ").append(tableName); + + if (condition.length() > 0) { + query.append(" where ").append(condition); + } + + if (!Strings.isNullOrEmpty(orderBy)) { + query.append(" order by ").append(orderBy); + } + + if (!Strings.isNullOrEmpty(limit)) { + query.append(" limit ").append(limit); + } + + LOG.debug("Generated BigQuery query for job: {}", query); + return query.toString(); } @VisibleForTesting - String generateQueryForMaterializingView(String datasetProject, String dataset, String table, String filter) { - String queryTemplate = "select * from `%s`%s"; - StringBuilder condition = new StringBuilder(); - + String generateQueryForMaterializingView(String datasetProject, String dataset, String table, + String filter, String limit, String orderBy) { + String tableName = String.format("`%s.%s.%s`", datasetProject, dataset, table); + StringBuilder query = new StringBuilder("select * from ").append(tableName); if (!Strings.isNullOrEmpty(filter)) { - condition.append(String.format(" where %s", filter)); + query.append(" where ").append(filter); } - String tableName = datasetProject + "." + dataset + "." + table; - return String.format(queryTemplate, tableName, condition.toString()); + if (!Strings.isNullOrEmpty(orderBy)) { + query.append(" order by ").append(orderBy); + } + + if (!Strings.isNullOrEmpty(limit)) { + query.append(" limit ").append(limit); + } + + return query.toString(); } /** diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java index 3d0a0a17d..9a4589ff0 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java @@ -36,6 +36,8 @@ public interface BigQueryConstants { String CONFIG_TABLE_FIELDS = "cdap.bq.sink.table.fields"; String CONFIG_JSON_STRING_FIELDS = "cdap.bq.sink.json.string.fields"; String CONFIG_FILTER = "cdap.bq.source.filter"; + String CONFIG_LIMIT = "cdap.bq.source.limit"; + String CONFIG_ORDER_BY = "cdap.bq.source.order.by"; String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter"; String CONFIG_JOB_ID = "cdap.bq.sink.job.id"; String CONFIG_VIEW_MATERIALIZATION_PROJECT = "cdap.bq.source.view.materialization.project"; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index 32ebd149d..d75030305 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -24,6 +24,7 @@ import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.RangePartitioning; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; @@ -82,7 +83,7 @@ public final class BigQueryUtil { private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class); - private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME"; + public static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME"; private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix"; public static final String BUCKET_PATTERN = "[a-z0-9._-]+"; @@ -778,6 +779,43 @@ public static String generateTimePartitionCondition(StandardTableDefinition tabl return timePartitionCondition.toString(); } + /** + * Generates a default "IS NOT NULL OR IS NULL" partition condition for a time-partitioned table. + * + * @param tableDefinition The definition of the table. + * @return The SQL condition string or an empty string if no condition is needed. + */ + public static String generateDefaultTimePartitionCondition( + StandardTableDefinition tableDefinition) { + TimePartitioning timePartitioning = tableDefinition.getTimePartitioning(); + if (timePartitioning == null) { + return StringUtils.EMPTY; + } + + String columnName = timePartitioning.getField() != null ? + timePartitioning.getField() : DEFAULT_PARTITION_COLUMN_NAME; + + return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName); + } + + /** + * Generates a default "IS NOT NULL OR IS NULL" partition condition for a range-partitioned + * table. + * + * @param tableDefinition The definition of the table. + * @return The SQL condition string or an empty string if no condition is needed. + */ + public static String generateDefaultRangePartitionCondition( + StandardTableDefinition tableDefinition) { + RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning(); + if (rangePartitioning == null || Strings.isNullOrEmpty(rangePartitioning.getField())) { + return StringUtils.EMPTY; + } + + String columnName = rangePartitioning.getField(); + return String.format("`%s` IS NOT NULL OR `%s` IS NULL", columnName, columnName); + } + /** * Get fully-qualified name (FQN) for a BQ table (FQN format: * bigquery:{projectId}.{datasetId}.{tableId}). diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java index 8704f7ada..2f850d89d 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormatTest.java @@ -16,63 +16,271 @@ package io.cdap.plugin.gcp.bigquery.source; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.RangePartitioning; +import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardTableDefinition; -import com.google.cloud.bigquery.Table; -import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; +import com.google.cloud.bigquery.TimePartitioning; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; -/** - * Unit Tests for generateQuery methods - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(BigQueryUtil.class) +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) public class PartitionedBigQueryInputFormatTest { + private static final String TEST_PROJECT = "test-project"; + private static final String TEST_DATASET = "test-dataset"; + private static final String TEST_TABLE = "test-table"; + private static final String TEST_FILTER = "age > 10"; + private static final String TEST_LIMIT = "100"; + private static final String TEST_ORDER_BY = "name asc"; + private static final String TEST_TABLE_SPEC = String.format("%s.%s.%s", TEST_PROJECT, TEST_DATASET, TEST_TABLE); + private static final String TEST_FROM_DATE = "2025-01-01"; + private static final String TEST_TO_DATE = "2025-01-02"; + private static final String TEST_PARTITION_CONDITION = + "TIMESTAMP(`_PARTITIONTIME`) >= TIMESTAMP(\"2025-01-01\") and " + + "TIMESTAMP(`_PARTITIONTIME`) < TIMESTAMP(\"2025-01-02\")"; + private static final String TEST_DEFAULT_TIME_CONDITION = + "(`_PARTITIONTIME` IS NOT NULL OR `_PARTITIONTIME` IS NULL)"; + private static final String TEST_DEFAULT_RANGE_CONDITION = "(`range_col` IS NOT NULL " + + "OR `range_col` IS NULL)"; + private static final String TEST_TIME_UNIT_COL = "my_date_col"; + private static final String TEST_TIME_UNIT_PARTITION_CONDITION = + "TIMESTAMP(`my_date_col`) >= TIMESTAMP(\"2025-01-01\") and " + + "TIMESTAMP(`my_date_col`) < TIMESTAMP(\"2025-01-02\")"; + private static final String TEST_DEFAULT_TIME_UNIT_CONDITION = + "(`my_date_col` IS NOT NULL OR `my_date_col` IS NULL)"; + + + @Mock + private StandardTableDefinition mockTableDefinition; + @Mock + private TimePartitioning mockTimePartitioning; + @Mock + private RangePartitioning mockRangePartitioning; + @Mock + private Schema mockSchema; + @Mock + private FieldList mockFieldList; + @Mock + private Field mockField; + + private PartitionedBigQueryInputFormat format; + + @Before + public void setUp() { + format = new PartitionedBigQueryInputFormat(); + when(mockTableDefinition.getTimePartitioning()).thenReturn(null); + when(mockTableDefinition.getRangePartitioning()).thenReturn(null); + } @Test - public void testGenerateQueryForMaterializingView() { - String datasetProject = "test_bq_dataset_project"; - String dataset = "test_bq_dataset"; - String table = "test_bq_table"; - String filter = "tableColumn = 'abc'"; - PartitionedBigQueryInputFormat partitionedBigQueryInputFormat = new PartitionedBigQueryInputFormat(); - String generatedQuery = partitionedBigQueryInputFormat.generateQueryForMaterializingView(datasetProject, dataset, - table, filter); - String expectedQuery = String.format("select * from `%s.%s.%s` where %s", datasetProject, dataset, table, filter); + public void testGenerateQueryForMaterializingView_WithFilterOnly() { + String expectedQuery = String.format("select * from `%s.%s.%s` where %s", + TEST_PROJECT, TEST_DATASET, TEST_TABLE, TEST_FILTER); + + String generatedQuery = format.generateQueryForMaterializingView( + TEST_PROJECT, TEST_DATASET, TEST_TABLE, TEST_FILTER, null, null); + Assert.assertEquals(expectedQuery, generatedQuery); + } - String expectedQueryWithoutFilter = String.format("select * from `%s.%s.%s`", datasetProject, dataset, table); - generatedQuery = partitionedBigQueryInputFormat.generateQueryForMaterializingView(datasetProject, dataset, - table, null); - Assert.assertEquals(expectedQueryWithoutFilter, generatedQuery); + @Test + public void testGenerateQueryForMaterializingView_NoFilterOrOptions() { + String expectedQuery = String.format("select * from `%s.%s.%s`", + TEST_PROJECT, TEST_DATASET, TEST_TABLE); + + String generatedQuery = format.generateQueryForMaterializingView( + TEST_PROJECT, TEST_DATASET, TEST_TABLE, null, null, null); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQueryForMaterializingView_AllOptions() { + String expectedQuery = String.format("select * from `%s.%s.%s` where %s order by %s limit %s", + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + TEST_FILTER, TEST_ORDER_BY, TEST_LIMIT); + + String generatedQuery = format.generateQueryForMaterializingView( + TEST_PROJECT, TEST_DATASET, TEST_TABLE, TEST_FILTER, TEST_LIMIT, TEST_ORDER_BY); + Assert.assertEquals(expectedQuery, generatedQuery); } @Test - public void testGenerateQuery() { - String datasetProject = "test_bq_dataset_project"; - String dataset = "test_bq_dataset"; - String table = "test_bq_table"; - String filter = "tableColumn = 'abc'"; - PartitionedBigQueryInputFormat partitionedBigQueryInputFormat = new PartitionedBigQueryInputFormat(); - PowerMockito.mockStatic(BigQueryUtil.class); - Table t = PowerMockito.mock(Table.class); - StandardTableDefinition tableDefinition = PowerMockito.mock(StandardTableDefinition.class); - PowerMockito.when(BigQueryUtil.getBigQueryTable(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(), - ArgumentMatchers.anyString(), ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), - ArgumentMatchers.any())).thenReturn(t); - PowerMockito.when(t.getDefinition()).thenReturn(tableDefinition); - String generatedQuery = partitionedBigQueryInputFormat.generateQuery(null, null, filter, datasetProject, - datasetProject, dataset, table, null, true); - String expectedQuery = String.format("select * from `%s.%s.%s` where %s", datasetProject, dataset, table, filter); + public void testGenerateQuery_WithFilterOnly() { + String expectedQuery = String.format("select * from %s where %s", + TEST_TABLE_SPEC, TEST_FILTER); + + String generatedQuery = format.generateQuery(null, null, + TEST_FILTER, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); Assert.assertEquals(expectedQuery, generatedQuery); + } - generatedQuery = partitionedBigQueryInputFormat.generateQuery(null, null, null, datasetProject, datasetProject, - dataset, table, null, true); - Assert.assertNull(generatedQuery); + @Test + public void testGenerateQuery_NoOptions() { + String expectedQuery = String.format("select * from %s", TEST_TABLE_SPEC); + + String generatedQuery = format.generateQuery(null, null, + null, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_AllOptions() { + String expectedQuery = String.format("select * from %s where %s order by %s limit %s", + TEST_TABLE_SPEC, TEST_FILTER, TEST_ORDER_BY, TEST_LIMIT); + + String generatedQuery = format.generateQuery(null, null, + TEST_FILTER, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + TEST_LIMIT, TEST_ORDER_BY, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionWithDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + + String expectedQuery = String.format("select * from %s where (%s)", + TEST_TABLE_SPEC, + TEST_PARTITION_CONDITION); + + String generatedQuery = format.generateQuery(TEST_FROM_DATE, TEST_TO_DATE, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + + @Test + public void testGenerateQuery_TimePartitionRequiredNoDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + + String expectedQuery = String.format("select * from %s where %s", + TEST_TABLE_SPEC, TEST_DEFAULT_TIME_CONDITION); + + String generatedQuery = format.generateQuery(null, null, + null, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_RangePartitionRequiredNoDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(null); + when(mockTableDefinition.getRangePartitioning()).thenReturn(mockRangePartitioning); + when(mockRangePartitioning.getField()).thenReturn("range_col"); + + String expectedQuery = String.format("select * from %s where %s", + TEST_TABLE_SPEC, TEST_DEFAULT_RANGE_CONDITION); + + String generatedQuery = format.generateQuery(null, null, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionRequiredAndFilter() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + String expectedQuery = String.format("select * from %s where %s and (%s)", + TEST_TABLE_SPEC, TEST_DEFAULT_TIME_CONDITION, TEST_FILTER); + + String generatedQuery = format.generateQuery(null, null, + TEST_FILTER, TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimeUnitPartitionWithDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(TEST_TIME_UNIT_COL); + when(mockTableDefinition.getSchema()).thenReturn(mockSchema); + when(mockSchema.getFields()).thenReturn(mockFieldList); + when(mockFieldList.get(TEST_TIME_UNIT_COL)).thenReturn(mockField); + when(mockField.getType()).thenReturn(LegacySQLTypeName.DATE); + + String expectedQuery = String.format("select * from %s where (%s)", + TEST_TABLE_SPEC, TEST_TIME_UNIT_PARTITION_CONDITION); + + String generatedQuery = format.generateQuery(TEST_FROM_DATE, TEST_TO_DATE, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimeUnitPartitionRequiredNoDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(TEST_TIME_UNIT_COL); + + String expectedQuery = String.format("select * from %s where %s", + TEST_TABLE_SPEC, TEST_DEFAULT_TIME_UNIT_CONDITION); + + String generatedQuery = format.generateQuery(null, null, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + true, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionFilterNotRequiredWithDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + when(mockTimePartitioning.getField()).thenReturn(null); + + String expectedQuery = String.format("select * from %s where (%s)", + TEST_TABLE_SPEC, + TEST_PARTITION_CONDITION); + + String generatedQuery = format.generateQuery(TEST_FROM_DATE, TEST_TO_DATE, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_TimePartitionFilterNotRequiredNoDates() { + when(mockTableDefinition.getTimePartitioning()).thenReturn(mockTimePartitioning); + + String expectedQuery = String.format("select * from %s", TEST_TABLE_SPEC); + String generatedQuery = format.generateQuery(null, null, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery_RangePartitionFilterNotRequired() { + when(mockTableDefinition.getRangePartitioning()).thenReturn(mockRangePartitioning); + + String expectedQuery = String.format("select * from %s", TEST_TABLE_SPEC); + + String generatedQuery = format.generateQuery(null, null, null, + TEST_PROJECT, TEST_DATASET, TEST_TABLE, + null, null, + false, mockTableDefinition); + Assert.assertEquals(expectedQuery, generatedQuery); } } diff --git a/widgets/BigQueryTable-batchsource.json b/widgets/BigQueryTable-batchsource.json index 6a7f92972..b16e91bac 100644 --- a/widgets/BigQueryTable-batchsource.json +++ b/widgets/BigQueryTable-batchsource.json @@ -151,6 +151,18 @@ "placeholder": "" } }, + { + "widget-type": "textbox", + "name": "orderBy", + "label": "Order By", + "widget-attributes": {"placeholder": "Column(s) to order by, e.g., 'name asc, age desc'"} + }, + { + "widget-type": "textbox", + "name": "limit", + "label": "Limit", + "widget-attributes": {"placeholder": "Maximum number of rows to read"} + }, { "widget-type": "textbox", "label": "Temporary Bucket Name",