docker setup; celery/beat config

This commit is contained in:
crflynn
2018-04-22 17:43:40 -04:00
parent 9cb0cfb9d1
commit 0cbdd8398d
18 changed files with 284 additions and 403 deletions

View File

@@ -11,6 +11,8 @@ import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from pypistats.run import celery
# For local use.
def load_env_vars(env="dev"):
@@ -29,16 +31,6 @@ MIRRORS = ("bandersnatch", "z3c.pypimirror", "Artifactory", "devpi")
# PyPI systems
SYSTEMS = ("Windows", "Linux", "Darwin")
# BigQuery definitions
DATASET_ID = "pypistats"
TABLE_ID = "pypistats"
SCHEMA = [
bigquery.SchemaField("package", "STRING", mode="REQUIRED"),
bigquery.SchemaField("category_label", "STRING", mode="REQUIRED"),
bigquery.SchemaField("category", "STRING", mode="NULLABLE"),
bigquery.SchemaField("downloads", "INTEGER", mode="NULLABLE"),
]
# postgresql tables to update for __all__
PSQL_TABLES = ["overall", "python_major", "python_minor", "system"]
@@ -69,63 +61,28 @@ def get_google_credentials():
return credentials
def get_daily_download_stats(event, context):
def get_daily_download_stats(env="dev", date=None):
"""Get daily download stats for pypi packages from BigQuery."""
start = time.time()
env = event["kwargs"]["env"]
if os.environ.get("ENV", None) is None:
load_env_vars(env)
else:
env = os.environ.get("ENV")
job_config = bigquery.QueryJobConfig()
credentials = get_google_credentials()
bq_client = bigquery.Client(
project=os.environ["GOOGLE_PROJECT_ID"],
credentials=credentials
)
date = event["kwargs"].get("date", None)
if date is None:
date = str(datetime.date.today() - datetime.timedelta(days=1))
# # Prepare a reference to the new dataset
# dataset_ref = bq_client.dataset(DATASET_ID)
# dataset = bigquery.Dataset(dataset_ref)
#
# # Create the dataset
# try:
# dataset = bq_client.create_dataset(dataset)
# except Conflict:
# pass
#
# # Prepare a reference to the table
# table_ref = dataset_ref.table(TABLE_ID)
# table = bigquery.Table(table_ref, schema=SCHEMA)
#
# # Create the table
# try:
# table = bq_client.create_table(table)
# except Conflict:
# pass
# local = False
# if env == "dev":
# try:
# print("Loading from csv...")
# df = pd.read_csv("ignore/sample_data.csv", index_col=0)
# print("Done.")
# # print(set(df["category_label"].values))
# # sys.exit()
# local = True
# except Exception:
# print("Loading failed.")
# if not local:
print("Querying BigQuery...")
# Get and perform the query, writing to destination table
print("Sending query to BigQuery...")
query = get_query(date)
print("Done.")
# job_config.destination = table_ref
# job_config.write_disposition = "WRITE_TRUNCATE"
print("Sent.")
query_job = bq_client.query(query, job_config=job_config)
iterator = query_job.result()
print("Downloading results.")
rows = list(iterator)
print(len(rows), "rows from gbq")
@@ -147,11 +104,9 @@ def get_daily_download_stats(event, context):
"downloads",
])
# # For local testing
# df.to_csv("ignore/sample_data.csv")
results = update_db(df, env)
print("Elapsed: " + str(time.time() - start))
results["elapsed"] = time.time() - start
return results
@@ -174,9 +129,6 @@ def update_db(df, env="dev"):
success[table] = update_table(
connection, cursor, table, df_category, date
)
# update_all_package_stats(cursor, table, date)
# update_recent_stats(cursor, date)
return success
@@ -205,18 +157,18 @@ def update_table(connection, cursor, table, df, date):
return False
def update_all_package_stats(event, context):
def update_all_package_stats(env="dev", date=None):
"""Update stats for __all__ packages."""
print("__all__")
start = time.time()
date = event["kwargs"].get("date", None)
if date is None:
date = str(datetime.date.today() - datetime.timedelta(days=1))
env = event["kwargs"]["env"]
if os.environ.get("ENV", None) is None:
load_env_vars(env)
else:
env = os.environ.get("ENV")
connection, cursor = get_connection_cursor(env)
@@ -246,21 +198,22 @@ def update_all_package_stats(event, context):
success[table] = False
print("Elapsed: " + str(time.time() - start))
success["elapsed"] = time.time() - start
return success
def update_recent_stats(event, context):
def update_recent_stats(env="dev", date=None):
"""Update daily, weekly, monthly stats for all packages."""
print("recent")
start = time.time()
date = event["kwargs"].get("date", None)
if date is None:
date = str(datetime.date.today() - datetime.timedelta(days=1))
env = event["kwargs"]["env"]
if os.environ.get("ENV", None) is None:
load_env_vars(env)
else:
env = os.environ.get("ENV")
connection, cursor = get_connection_cursor(env)
@@ -305,6 +258,7 @@ def update_recent_stats(event, context):
success[period] = False
print("Elapsed: " + str(time.time() - start))
success["elapsed"] = time.time() - start
return success
@@ -322,19 +276,19 @@ def get_connection_cursor(env):
return connection, cursor
def purge_old_data(event, context):
def purge_old_data(env="dev", date=None):
"""Purge old data records."""
print("Purge")
age = MAX_RECORD_AGE
start = time.time()
date = event["kwargs"].get("date", None)
if date is None:
date = str(datetime.date.today() - datetime.timedelta(days=1))
env = event["kwargs"]["env"]
if os.environ.get("ENV", None) is None:
load_env_vars(env)
else:
env = os.environ.get("ENV")
connection, cursor = get_connection_cursor(env)
@@ -355,6 +309,7 @@ def purge_old_data(event, context):
success[table] = False
print("Elapsed: " + str(time.time() - start))
success["elapsed"] = time.time() - start
return success
@@ -449,17 +404,28 @@ def get_query(date):
"""
if __name__ == "__main__":
date = "2018-04-18"
env = "prod"
event = {
"kwargs": {
"date": date,
"env": env,
}
@celery.task
def etl():
"""Perform the stats download."""
results = {
"downloads": get_daily_download_stats(),
"__all__": update_all_package_stats(),
"recent": update_recent_stats(),
"purge": purge_old_data(),
}
context = None
return results
@celery.task
def test():
"""Test task for celery/beat."""
print("hello test.")
if __name__ == "__main__":
date = "2018-04-19"
env = "prod"
print(date, env)
print(get_daily_download_stats(event, context))
print(update_all_package_stats(event, context))
print(update_recent_stats(event, context))
print(get_daily_download_stats(env, date))
print(update_all_package_stats(env, date))
print(update_recent_stats(env, date))