How to append your data from Cloud Storage to BigQuery with Python (ETL)

Hello Everyone,

BigQuery is a fully-managed enterprise data warehouse for analytics. It is cheap and high-scalable. In this article, I would like to share a basic tutorial for Google Cloud Storage and  BigQuery with Python.

Installation
pip install google-cloud-bigquery

Create credentials

export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/[FILE_NAME].json

Additionally, please set the PATH to environment variables.

Read from Cloud Storage Append on Big Query

#Import libraries
from google.cloud import bigquery
from google.oauth2 import service_account

#Set Credentials “Create your own credential files on google cloud account”
credentials = service_account.Credentials.from_service_account_file(
‘C:\\Users\\talih\Desktop\\BigQuerytoTableau-6d00b31bb9ab.json’)
project_id = ‘bigquery-to-tableau’

#Set table_ref,project_id and credentials for POST request
client = bigquery.Client(credentials= credentials,project=project_id)
table_ref = client.dataset(‘BigTableau’).table(‘dataflowbasics_schemas’)

#Specify your api post requests with  a few parameter
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
job_config.skip_leading_rows = 1
job_config.autodetect = True
job_config.allow_jagged_rows = True
job_config.ignore_unknown_values = True
job_config.max_bad_records = 1000
schema = [
bigquery.SchemaField(‘bietl_id’, ‘INTEGER’, mode=’REQUIRED’),
bigquery.SchemaField(‘bietltools_name’, ‘STRING’, mode=’REQUIRED’),
bigquery.SchemaField(‘bietl_usage’, ‘FLOAT’, mode=’NULLABLE’),
bigquery.SchemaField(‘bietl_salary’, ‘INTEGER’, mode=’NULLABLE’),
]
job_config.schema = schema

#Set your cloud storage bucket name
uri = ‘gs://desctinations3tostorage/bietl20180829.csv’

#Post your request to Google Api
load_job = client.load_table_from_uri(
uri,
table_ref,
job_config=job_config) # API request

assert load_job.job_type == ‘load’

load_job.result() # Waits for table load to complete.

assert load_job.state == ‘DONE’

Now you can access your own data on big query interfaces.

How to use dbt in python environment

Dbt is usefull library for dwh to create a datamart or datamarts. You can find all details in dbt official pages.

I used a few times, so i can clarify for you how you can create a dbt models and dbt configs in your own project, you can do that like below step by steps;

1 – Create a profiles.yml file for DBT Profile. Specify your db connection information etc.
2 – Create a data_model folder like project_dir
3 – Create a .yml file for main project .yml file and you will call it like project_file
4 – Create your own dbt_runner file like dbt_runner.py and set it your execution configs
5 – Create a model folder, you will put your models in that folder
6 – Create a schema or model for yourself and put into that folder a xxxx.schema.yml file
6.1 – Put some table value like below;
bietl_patch:
constraints:
unique:
– somthng_id
not_null:
– somthng_id
– xxx_id

In the end, you will have like below folder and schema;
# DBT Profile. Specify your DB connection information etc.
profiles.yml on the root directory
bietl_data_model folder
bietl_datamarts.yml file
dbt_runner.py python file

bietl_data_model
> models
> specification of your models bietl
> bietl_datamarts.schema.yml
> sql files for using.sql

I’m executing that dbt in airflow das but I didn’t mention it, maybe in next post.