Deploying and managing Google BigQuery for enterprise data analytics
Google BigQuery is a fully-managed, serverless data warehouse that enables super-fast SQL queries using the processing power of Google's infrastructure. It allows you to analyze large datasets with high performance and cost efficiency.
Key Features
Serverless: No infrastructure to manage or provision
Petabyte Scale: Query petabytes of data with ease
Separation of Storage and Compute: Pay only for the storage you use and the queries you run
Real-time Analytics: Stream data for real-time analysis and ML
Geospatial Analysis: Built-in support for geographic data types
ML Integration: Train and run ML models directly using SQL with BigQuery ML
BI Engine: In-memory analysis service for sub-second query response
Federated Queries: Query data from external sources without copying
Data Governance: Column-level security and dynamic data masking
Cost Controls: Custom quotas and intelligent pricing models
resource "google_bigquery_dataset" "sensitive_data" {
dataset_id = "sensitive_data"
friendly_name = "Sensitive Data"
description = "Contains PII and other sensitive information"
location = "US"
# Enable CMEK (Customer Managed Encryption Keys)
default_encryption_configuration {
kms_key_name = google_kms_crypto_key.bq_key.id
}
# Set access controls
access {
role = "OWNER"
user_by_email = google_service_account.bq_admin.email
}
# No direct access for analysts
}
# Create an authorized view dataset
resource "google_bigquery_dataset" "authorized_views" {
dataset_id = "authorized_views"
friendly_name = "Authorized Views"
description = "Contains secured views for analysts"
location = "US"
access {
role = "OWNER"
user_by_email = google_service_account.bq_admin.email
}
access {
role = "READER"
group_by_email = "analysts@example.com"
}
}
# Create a KMS key for encryption
resource "google_kms_key_ring" "bigquery" {
name = "bigquery-keyring"
location = "global"
}
resource "google_kms_crypto_key" "bq_key" {
name = "bigquery-key"
key_ring = google_kms_key_ring.bigquery.id
}
# Grant BigQuery service account access to the KMS key
data "google_project" "project" {}
resource "google_kms_crypto_key_iam_member" "crypto_key" {
crypto_key_id = google_kms_crypto_key.bq_key.id
role = "roles/cloudkms.cryptoKeyEncrypterDecrypter"
member = "serviceAccount:bq-${data.google_project.project.number}@bigquery-encryption.iam.gserviceaccount.com"
}
# Create sensitive data table
resource "google_bigquery_table" "customer_data" {
dataset_id = google_bigquery_dataset.sensitive_data.dataset_id
table_id = "customer_data"
schema = <<EOF
[
{
"name": "customer_id",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "full_name",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "email",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "address",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "date_of_birth",
"type": "DATE",
"mode": "NULLABLE"
},
{
"name": "region",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "customer_type",
"type": "STRING",
"mode": "NULLABLE"
}
]
EOF
# Apply row access policy (defined later)
deletion_protection = true
}
# Create authorized view
resource "google_bigquery_table" "customer_view" {
dataset_id = google_bigquery_dataset.authorized_views.dataset_id
table_id = "customer_view"
view {
query = <<EOF
SELECT
customer_id,
CONCAT(SUBSTR(full_name, 1, 1), '***') AS masked_name,
REGEXP_REPLACE(email, r'(.+)@(.+)', r'****@\\2') AS masked_email,
region,
customer_type
FROM
${google_bigquery_dataset.sensitive_data.dataset_id}.${google_bigquery_table.customer_data.table_id}
EOF
use_legacy_sql = false
}
deletion_protection = true
}
# Grant view access to the underlying table
resource "google_bigquery_dataset_access" "authorized_view_access" {
dataset_id = google_bigquery_dataset.sensitive_data.dataset_id
view {
project_id = data.google_project.project.project_id
dataset_id = google_bigquery_dataset.authorized_views.dataset_id
table_id = google_bigquery_table.customer_view.table_id
}
}
# Create a row access policy for region-based filtering
resource "google_bigquery_table_iam_policy" "policy" {
project = data.google_project.project.project_id
dataset_id = google_bigquery_dataset.sensitive_data.dataset_id
table_id = google_bigquery_table.customer_data.table_id
policy_data = data.google_iam_policy.row_access_policy.policy_data
}
data "google_iam_policy" "row_access_policy" {
binding {
role = "roles/bigquery.dataViewer"
members = [
"group:us-analysts@example.com",
]
condition {
title = "US Region Access Only"
description = "Only allows access to US region data"
expression = "resource.region == \"US\""
}
}
binding {
role = "roles/bigquery.dataViewer"
members = [
"group:eu-analysts@example.com",
]
condition {
title = "EU Region Access Only"
description = "Only allows access to EU region data"
expression = "resource.region == \"EU\""
}
}
}
# Create a dataset
gcloud bq mk \
--dataset \
--description="Sales analytics dataset" \
--location=US \
your-project-id:sales_analytics
# Create a table with schema
gcloud bq mk \
--table \
--schema="transaction_id:STRING,customer_id:STRING,product_id:STRING,quantity:INTEGER,price:FLOAT,timestamp:TIMESTAMP" \
your-project-id:sales_analytics.transactions
# Load data into a table from Cloud Storage
gcloud bq load \
--source_format=CSV \
--skip_leading_rows=1 \
your-project-id:sales_analytics.transactions \
gs://your-bucket/sales-data/transactions.csv \
transaction_id:STRING,customer_id:STRING,product_id:STRING,quantity:INTEGER,price:FLOAT,timestamp:TIMESTAMP
# Run a simple query
gcloud bq query \
--use_legacy_sql=false \
'SELECT COUNT(*) as transaction_count FROM `your-project-id.sales_analytics.transactions`'
# Run a query and save results to a table
gcloud bq query \
--use_legacy_sql=false \
--destination_table=sales_analytics.daily_summary \
--append_table \
'SELECT
DATE(timestamp) as date,
COUNT(*) as transactions,
SUM(price * quantity) as revenue
FROM
`your-project-id.sales_analytics.transactions`
GROUP BY
date
ORDER BY
date DESC'
# Export query results to Cloud Storage
gcloud bq extract \
your-project-id:sales_analytics.daily_summary \
gs://your-bucket/exports/daily_summary.csv
# Grant dataset access
gcloud bq add-iam-policy-binding \
--dataset=your-project-id:sales_analytics \
--member=user:user@example.com \
--role=roles/bigquery.dataViewer
# Grant table access
gcloud bq add-iam-policy-binding \
--table=your-project-id:sales_analytics.transactions \
--member=serviceAccount:etl-service@your-project-id.iam.gserviceaccount.com \
--role=roles/bigquery.dataEditor
# View current IAM policy for a dataset
gcloud bq get-iam-policy \
--dataset=your-project-id:sales_analytics
# main.py
import json
import base64
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage
# Initialize clients
bigquery_client = bigquery.Client()
storage_client = storage.Client()
def process_transaction_data(event, context):
"""Cloud Function triggered by a Pub/Sub event"""
# Decode the Pub/Sub message
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
message_data = json.loads(pubsub_message)
# Get source file details
bucket_name = message_data['bucket']
file_name = message_data['file']
# Set up the job configuration
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("transaction_id", "STRING"),
bigquery.SchemaField("transaction_date", "TIMESTAMP"),
bigquery.SchemaField("customer_id", "STRING"),
bigquery.SchemaField("product_id", "STRING"),
bigquery.SchemaField("quantity", "INTEGER"),
bigquery.SchemaField("unit_price", "FLOAT"),
bigquery.SchemaField("total_amount", "FLOAT"),
bigquery.SchemaField("payment_method", "STRING"),
bigquery.SchemaField("store_id", "STRING"),
bigquery.SchemaField("json_payload", "JSON"),
],
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
)
# Get the table reference
table_id = f"{message_data['project_id']}.raw_data.transactions"
# Create a job to load data from GCS to BigQuery
uri = f"gs://{bucket_name}/{file_name}"
load_job = bigquery_client.load_table_from_uri(
uri, table_id, job_config=job_config
)
# Wait for the job to complete
load_job.result()
# Get statistics
table = bigquery_client.get_table(table_id)
print(f"Loaded {load_job.output_rows} rows into {table_id}")
return f"Loaded {load_job.output_rows} rows into {table_id}"
-- Example query for sales dashboard
SELECT
d.year,
d.month,
FORMAT_DATETIME('%b %Y', DATE(d.year, d.month, 1)) AS month_year,
c.category,
SUM(s.total_revenue) AS revenue,
SUM(s.total_units_sold) AS units_sold,
SUM(s.transaction_count) AS transactions,
SUM(s.unique_customers) AS customers
FROM
`your-project-id.data_marts.sales_by_month` s
JOIN
`your-project-id.data_warehouse.dim_date` d ON (s.year = d.year AND s.month = d.month)
JOIN
`your-project-id.data_warehouse.dim_product` p ON s.product_id = p.product_id
JOIN
`your-project-id.data_warehouse.dim_category` c ON p.category_id = c.category_id
WHERE
d.date >= DATE_SUB(CURRENT_DATE(), INTERVAL 12 MONTH)
GROUP BY
d.year, d.month, month_year, c.category
ORDER BY
d.year, d.month, c.category