add env var setup; do google credentials via env vars; update pypi tasks

This commit is contained in:
crflynn
2018-04-08 16:49:48 -04:00
parent 3f2b2fa64f
commit d0109364d6
4 changed files with 147 additions and 82 deletions

1
.gitignore vendored
View File

@@ -1,6 +1,7 @@
# credentials
secret.*
env_vars*
# mac osx
.DS_Store

View File

@@ -1,61 +1,66 @@
"""Application configuration."""
import json
import os
from pypistats.secret import postgresql
from pypistats.secret import github
# Load env vars
ENV = os.environ.get("ENV", None)
# If none then load dev locally.
if ENV is None:
local_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"secret",
"env_vars_dev.json")
for key, value in json.load(open(local_path, 'r')).items():
os.environ[key] = value
def get_db_uri(env):
"""Get the database URI."""
return \
"postgresql://{username}:{password}@{host}:{port}/{dbname}".format(
username=postgresql[env]["username"],
password=postgresql[env]["password"],
host=postgresql[env]["host"],
port=postgresql[env]["port"],
dbname=postgresql[env]["dbname"],
username=os.environ.get("POSTGRESQL_USERNAME"),
password=os.environ.get("POSTGRESQL_PASSWORD"),
host=os.environ.get("POSTGRESQL_HOST"),
port=os.environ.get("POSTGRESQL_PORT"),
dbname=os.environ.get("POSTGRESQL_DBNAME"),
)
class Config(object):
"""Base configuration."""
SECRET_KEY = os.environ.get("PYPISTATS_SECRET", "secret-key")
APP_DIR = os.path.abspath(os.path.dirname(__file__))
GITHUB_CLIENT_ID = os.environ.get("GITHUB_CLIENT_ID")
GITHUB_CLIENT_SECRET = os.environ.get("GITHUB_CLIENT_SECRET")
PROJECT_ROOT = os.path.abspath(os.path.join(APP_DIR, os.pardir))
SECRET_KEY = os.environ.get("PYPISTATS_SECRET", "secret-key")
SQLALCHEMY_DATABASE_URI = get_db_uri(ENV)
SQLALCHEMY_TRACK_MODIFICATIONS = False
class ProdConfig(Config):
"""Production configuration."""
ENV = "prod"
DEBUG = False
SQLALCHEMY_DATABASE_URI = get_db_uri(ENV)
GITHUB_CLIENT_ID = github[ENV]["client_id"]
GITHUB_CLIENT_SECRET = github[ENV]["client_secret"]
ENV = "prod"
class DevConfig(Config):
"""Development configuration."""
ENV = "dev"
DEBUG = True
SQLALCHEMY_DATABASE_URI = get_db_uri(ENV)
GITHUB_CLIENT_ID = github[ENV]["client_id"]
GITHUB_CLIENT_SECRET = github[ENV]["client_secret"]
ENV = "dev"
class TestConfig(Config):
"""Test configuration."""
DEBUG = True
ENV = "dev"
TESTING = True
DEBUG = True
SQLALCHEMY_DATABASE_URI = get_db_uri(ENV)
WTF_CSRF_ENABLED = False # Allows form testing
GITHUB_CLIENT_ID = github[ENV]["client_id"]
GITHUB_CLIENT_SECRET = github[ENV]["client_secret"]
configs = {

View File

@@ -1,23 +1,36 @@
"""Get the download stats for a specific day."""
import datetime
import json
import time
import os
# import sys
# from google.api_core.exceptions import Conflict
from google.auth.crypt._python_rsa import RSASigner
from google.cloud import bigquery
from google.oauth2.service_account import Credentials
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from pypistats.secret import postgresql
# Load env vars
ENV = os.environ.get("ENV", None)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = \
os.path.join(
# If none then load dev locally.
if ENV is None:
local_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"secret",
"secret.json",
)
"env_vars_dev.json")
for key, value in json.load(open(local_path, 'r')).items():
os.environ[key] = value
# # OLD: FOR LOCAL EXECUTION
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = \
# os.path.join(
# os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
# "secret",
# "secret.json",
# )
# Mirrors to disregard when considering downloads
MIRRORS = ("bandersnatch", "z3c.pypimirror", "Artifactory", "devpi")
@@ -42,10 +55,38 @@ PSQL_TABLES = ["overall", "python_major", "python_minor", "system"]
MAX_RECORD_AGE = 45
def get_google_credentials():
"""Obtain the Google credentials object explicitly."""
private_key = os.environ["GOOGLE_PRIVATE_KEY"]
private_key_id = os.environ["GOOGLE_PRIVATE_KEY_ID"]
signer = RSASigner.from_string(key=private_key, key_id=private_key_id)
project_id = os.environ["GOOGLE_PROJECT_ID"]
service_account_email = os.environ["GOOGLE_CLIENT_EMAIL"]
scopes = (
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform'
)
token_uri = os.environ["GOOGLE_TOKEN_URI"]
credentials = Credentials(
signer=signer,
service_account_email=service_account_email,
token_uri=token_uri,
scopes=scopes,
project_id=project_id,
)
return credentials
def get_daily_download_stats(date, env="dev"):
"""Get daily download stats for pypi packages from BigQuery."""
start = time.time()
job_config = bigquery.QueryJobConfig()
bq_client = bigquery.Client()
credentials = get_google_credentials()
bq_client = bigquery.Client(
project=os.environ["GOOGLE_PROJECT_ID"],
credentials=credentials
)
# # Prepare a reference to the new dataset
# dataset_ref = bq_client.dataset(DATASET_ID)
@@ -67,50 +108,52 @@ def get_daily_download_stats(date, env="dev"):
# except Conflict:
# pass
local = False
if env == "dev":
try:
print("Loading from csv...")
df = pd.read_csv("ignore/sample_data.csv", index_col=0)
print("Done.")
# print(set(df["category_label"].values))
# sys.exit()
local = True
except Exception:
print("Loading failed.")
# local = False
# if env == "dev":
# try:
# print("Loading from csv...")
# df = pd.read_csv("ignore/sample_data.csv", index_col=0)
# print("Done.")
# # print(set(df["category_label"].values))
# # sys.exit()
# local = True
# except Exception:
# print("Loading failed.")
if not local:
print("Querying BigQuery...")
# Get and perform the query, writing to destination table
query = get_query(date)
print("Done.")
# job_config.destination = table_ref
# job_config.write_disposition = "WRITE_TRUNCATE"
query_job = bq_client.query(query, job_config=job_config)
iterator = query_job.result()
rows = list(iterator)
# if not local:
print("Querying BigQuery...")
# Get and perform the query, writing to destination table
query = get_query(date)
print("Done.")
# job_config.destination = table_ref
# job_config.write_disposition = "WRITE_TRUNCATE"
query_job = bq_client.query(query, job_config=job_config)
iterator = query_job.result()
rows = list(iterator)
data = []
for row in rows:
data.append((
date,
row['package'],
row['category_label'],
row['category'],
row['downloads']
))
data = []
for row in rows:
data.append((
date,
row['package'],
row['category_label'],
row['category'],
row['downloads']
))
df = pd.DataFrame(data, columns=[
"date",
"package",
"category_label",
"category",
"downloads",
])
df = pd.DataFrame(data, columns=[
"date",
"package",
"category_label",
"category",
"downloads",
])
df.to_csv("ignore/sample_data.csv")
df.to_csv("ignore/sample_data.csv")
return update_db(df, env)
results = update_db(df, env)
print("Elapsed: " + str(time.time() - start))
return results
def update_db(df, env="dev"):
@@ -163,12 +206,14 @@ def update_table(connection, cursor, table, df, date):
def update_all_package_stats(date, env="dev"):
"""Update stats for __all__ packages."""
print("__all__")
start = time.time()
connection, cursor = get_connection_cursor(env)
success = {}
for table in PSQL_TABLES:
aggregate_query = \
f"""SELECT date, '__all__' AS package, category, sum(downloads) AS downloads
FROM {table} GROUP BY date, category"""
FROM {table} where date = '{date}' GROUP BY date, category"""
cursor.execute(aggregate_query, (table,))
values = cursor.fetchall()
@@ -182,15 +227,19 @@ def update_all_package_stats(date, env="dev"):
cursor.execute(delete_query)
execute_values(cursor, insert_query, values)
connection.commit()
return True
success[table] = True
except psycopg2.IntegrityError as e:
connection.rollback()
return False
success[table] = False
print("Elapsed: " + str(time.time() - start))
return success
def update_recent_stats(date, env="dev"):
"""Update daily, weekly, monthly stats for all packages."""
print("recent")
start = time.time()
connection, cursor = get_connection_cursor(env)
downloads_table = "overall"
@@ -207,9 +256,9 @@ def update_recent_stats(date, env="dev"):
}
success = {}
for time, clause in where.items():
for period, clause in where.items():
select_query = \
f"""SELECT package, '{time}' as category, sum(downloads) AS downloads
f"""SELECT package, '{period}' as category, sum(downloads) AS downloads
FROM {downloads_table}
WHERE category = 'without_mirrors' and {clause}
GROUP BY package"""
@@ -218,7 +267,7 @@ def update_recent_stats(date, env="dev"):
delete_query = \
f"""DELETE FROM {recent_table}
WHERE category = '{time}'"""
WHERE category = '{period}'"""
insert_query = \
f"""INSERT INTO {recent_table}
(package, category, downloads) VALUES %s"""
@@ -226,20 +275,23 @@ def update_recent_stats(date, env="dev"):
cursor.execute(delete_query)
execute_values(cursor, insert_query, values)
connection.commit()
success[time] = True
success[period] = True
except psycopg2.IntegrityError as e:
connection.rollback()
success[time] = False
success[period] = False
print("Elapsed: " + str(time.time() - start))
return success
def get_connection_cursor(env):
"""Get a db connection cursor."""
connection = psycopg2.connect(
dbname=postgresql[env]['dbname'],
user=postgresql[env]['username'],
password=postgresql[env]['password'],
host=postgresql[env]['host'],
port=postgresql[env]['port'],
dbname=os.environ["POSTGRESQL_DBNAME"],
user=os.environ["POSTGRESQL_USERNAME"],
password=os.environ["POSTGRESQL_PASSWORD"],
host=os.environ["POSTGRESQL_HOST"],
port=os.environ["POSTGRESQL_PORT"],
# sslmode='require',
)
cursor = connection.cursor()
@@ -248,6 +300,8 @@ def get_connection_cursor(env):
def purge_old_data(date, env="dev", age=MAX_RECORD_AGE):
"""Purge old data records."""
print("Purge")
start = time.time()
connection, cursor = get_connection_cursor(env)
date = datetime.datetime.strptime(date, '%Y-%m-%d')
@@ -265,6 +319,7 @@ def purge_old_data(date, env="dev", age=MAX_RECORD_AGE):
connection.rollback()
success[table] = False
print("Elapsed: " + str(time.time() - start))
return success
@@ -360,5 +415,8 @@ def get_query(date):
if __name__ == "__main__":
date = "2018-02-08"
print(get_daily_download_stats(date))
date = "2018-02-09"
env = "dev"
# print(get_daily_download_stats(date, env))
print(update_all_package_stats(date, env))
# print(update_recent_stats(date, env))

View File

@@ -7,6 +7,7 @@
},
"profile_name": "default",
"project_name": "pypistats",
"remote_env": "s3://pypistats/config/env_vars_dev.json",
"runtime": "python3.6",
"s3_bucket": "pypistats"
}