|
49 | 49 | bq_dataset_name = 'airflow_bq_notify_dataset_{{ ds_nodash }}' |
50 | 50 | bq_recent_questions_table_id = 'recent_questions' |
51 | 51 | bq_most_popular_table_id = 'most_popular' |
52 | | -output_file = '{gcs_bucket}/recent_questionsS.csv'.format( |
53 | | - gcs_bucket=models.Variable.get('gcs_bucket')) |
| 52 | +gcs_bucket = models.Variable.get('gcs_bucket') |
| 53 | +output_file = f'{gcs_bucket}/recent_questionsS.csv' |
54 | 54 | location = 'US' |
55 | 55 | project_id = models.Variable.get('gcp_project') |
56 | 56 |
|
57 | | -# TODO: Update query dates |
58 | | -# TODO: fix string formatting |
59 | | - |
60 | 57 | # Data from the month of January 2018 |
61 | 58 | # You may change the query dates to get data from a different time range. You |
62 | 59 | # may also dynamically pick a date range based on DAG schedule date. Airflow |
|
66 | 63 | max_query_date = '2018-02-01' |
67 | 64 | min_query_date = '2018-01-01' |
68 | 65 |
|
69 | | -RECENT_QUESTIONS_QUERY = """ |
| 66 | +RECENT_QUESTIONS_QUERY = f""" |
70 | 67 | SELECT owner_display_name, title, view_count |
71 | 68 | FROM `bigquery-public-data.stackoverflow.posts_questions` |
72 | | - WHERE creation_date < CAST('{max_date}' AS TIMESTAMP) |
73 | | - AND creation_date >= CAST('{min_date}' AS TIMESTAMP) |
| 69 | + WHERE creation_date < CAST('{max_query_date}' AS TIMESTAMP) |
| 70 | + AND creation_date >= CAST('{min_query_date}' AS TIMESTAMP) |
74 | 71 | ORDER BY view_count DESC |
75 | 72 | LIMIT 100 |
76 | | - """.format(max_date=max_query_date, min_date=min_query_date) |
| 73 | + """ |
77 | 74 |
|
78 | | -MOST_POPULAR_QUERY = """ |
| 75 | +MOST_POPULAR_QUERY = f""" |
79 | 76 | SELECT title, view_count |
80 | | - FROM `{table}` |
| 77 | + FROM `{project_id}.{bq_dataset_name}.{bq_recent_questions_table_id}` |
81 | 78 | ORDER BY view_count DESC |
82 | 79 | LIMIT 1 |
83 | | - """.format(table=project_id + "." + bq_dataset_name + "." + bq_recent_questions_table_id) |
| 80 | + """ |
84 | 81 |
|
85 | 82 | yesterday = datetime.datetime.combine( |
86 | 83 | datetime.datetime.today() - datetime.timedelta(1), |
87 | 84 | datetime.datetime.min.time()) |
88 | 85 |
|
89 | | -# TODO: Add info about sendgrid operator config |
90 | 86 | # [START composer_notify_failure] |
91 | 87 | default_dag_args = { |
92 | 88 | 'start_date': yesterday, |
|
111 | 107 | task_id='make_bq_dataset', |
112 | 108 | # Executing 'bq' command requires Google Cloud SDK which comes |
113 | 109 | # preinstalled in Cloud Composer. |
114 | | - bash_command='bq ls {} || bq mk {}'.format( |
115 | | - bq_dataset_name, bq_dataset_name)) |
| 110 | + bash_command=f'bq ls {bq_dataset_name} || bq mk {bq_dataset_name}') |
116 | 111 | # [END composer_bash_bq] |
117 | 112 |
|
118 | 113 | # [START composer_bigquery] |
|
168 | 163 | table_id=bq_most_popular_table_id) |
169 | 164 |
|
170 | 165 | # [START composer_email] |
171 | | - # Send email confirmation |
| 166 | + # Send email confirmation (you will need to set up the email operator) |
172 | 167 | email_summary = email.EmailOperator( |
173 | 168 | task_id='email_summary', |
174 | 169 | to=models.Variable.get('email'), |
|
0 commit comments