mirror of
https://github.com/LukeHagar/pypistats.dev.git
synced 2025-12-09 12:47:48 +00:00
updates (#31)
* update ignore files * setup poetry * add db seeds * black * set up compose * backfill script * add makefile, update readme * update poetry * readme * Fixes * cleanup and rate limit changes * poetry 1.0.5 * some more cleanup * k8s * k8s * update yml * cleanup and admin * deploy
This commit is contained in:
@@ -1,16 +1,15 @@
|
||||
"""Get the download stats for a specific day."""
|
||||
import datetime
|
||||
import time
|
||||
import os
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
from google.auth.crypt._python_rsa import RSASigner
|
||||
from google.cloud import bigquery
|
||||
from google.oauth2.service_account import Credentials
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
from pypistats.run import celery
|
||||
|
||||
from pypistats.extensions import celery
|
||||
|
||||
# Mirrors to disregard when considering downloads
|
||||
MIRRORS = ("bandersnatch", "z3c.pypimirror", "Artifactory", "devpi")
|
||||
@@ -27,16 +26,13 @@ MAX_RECORD_AGE = 180
|
||||
|
||||
def get_google_credentials():
|
||||
"""Obtain the Google credentials object explicitly."""
|
||||
private_key = os.environ["GOOGLE_PRIVATE_KEY"]
|
||||
private_key = os.environ["GOOGLE_PRIVATE_KEY"].replace('"', "").replace("\\n", "\n")
|
||||
private_key_id = os.environ["GOOGLE_PRIVATE_KEY_ID"]
|
||||
signer = RSASigner.from_string(key=private_key, key_id=private_key_id)
|
||||
|
||||
project_id = os.environ["GOOGLE_PROJECT_ID"]
|
||||
service_account_email = os.environ["GOOGLE_CLIENT_EMAIL"]
|
||||
scopes = (
|
||||
'https://www.googleapis.com/auth/bigquery',
|
||||
'https://www.googleapis.com/auth/cloud-platform'
|
||||
)
|
||||
scopes = ("https://www.googleapis.com/auth/bigquery", "https://www.googleapis.com/auth/cloud-platform")
|
||||
token_uri = os.environ["GOOGLE_TOKEN_URI"]
|
||||
credentials = Credentials(
|
||||
signer=signer,
|
||||
@@ -48,16 +44,13 @@ def get_google_credentials():
|
||||
return credentials
|
||||
|
||||
|
||||
def get_daily_download_stats(env="dev", date=None):
|
||||
def get_daily_download_stats(date):
|
||||
"""Get daily download stats for pypi packages from BigQuery."""
|
||||
start = time.time()
|
||||
|
||||
job_config = bigquery.QueryJobConfig()
|
||||
credentials = get_google_credentials()
|
||||
bq_client = bigquery.Client(
|
||||
project=os.environ["GOOGLE_PROJECT_ID"],
|
||||
credentials=credentials
|
||||
)
|
||||
bq_client = bigquery.Client(project=os.environ["GOOGLE_PROJECT_ID"], credentials=credentials)
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
|
||||
@@ -76,29 +69,22 @@ def get_daily_download_stats(env="dev", date=None):
|
||||
for row in rows:
|
||||
if row["category_label"] not in data:
|
||||
data[row["category_label"]] = []
|
||||
data[row["category_label"]].append([
|
||||
date,
|
||||
row["package"],
|
||||
row["category"],
|
||||
row["downloads"],
|
||||
])
|
||||
data[row["category_label"]].append([date, row["package"], row["category"], row["downloads"]])
|
||||
|
||||
results = update_db(data, env, date)
|
||||
results = update_db(data, date)
|
||||
print("Elapsed: " + str(time.time() - start))
|
||||
results["elapsed"] = time.time() - start
|
||||
return results
|
||||
|
||||
|
||||
def update_db(data, env="dev", date=None):
|
||||
def update_db(data, date=None):
|
||||
"""Update the db with new data by table."""
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
connection, cursor = get_connection_cursor()
|
||||
|
||||
success = {}
|
||||
for category_label, rows in data.items():
|
||||
table = category_label
|
||||
success[table] = update_table(
|
||||
connection, cursor, table, rows, date
|
||||
)
|
||||
success[table] = update_table(connection, cursor, table, rows, date)
|
||||
|
||||
return success
|
||||
|
||||
@@ -130,11 +116,9 @@ def update_table(connection, cursor, table, rows, date):
|
||||
for idx in sorted(delete_rows, reverse=True):
|
||||
rows.pop(idx)
|
||||
|
||||
delete_query = \
|
||||
f"""DELETE FROM {table}
|
||||
delete_query = f"""DELETE FROM {table}
|
||||
WHERE date = '{date}'"""
|
||||
insert_query = \
|
||||
f"""INSERT INTO {table} (date, package, category, downloads)
|
||||
insert_query = f"""INSERT INTO {table} (date, package, category, downloads)
|
||||
VALUES %s"""
|
||||
|
||||
try:
|
||||
@@ -149,7 +133,7 @@ def update_table(connection, cursor, table, rows, date):
|
||||
return False
|
||||
|
||||
|
||||
def update_all_package_stats(env="dev", date=None):
|
||||
def update_all_package_stats(date=None):
|
||||
"""Update stats for __all__ packages."""
|
||||
print("__all__")
|
||||
start = time.time()
|
||||
@@ -157,21 +141,18 @@ def update_all_package_stats(env="dev", date=None):
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
connection, cursor = get_connection_cursor()
|
||||
|
||||
success = {}
|
||||
for table in PSQL_TABLES:
|
||||
aggregate_query = \
|
||||
f"""SELECT date, '__all__' AS package, category, sum(downloads) AS downloads
|
||||
aggregate_query = f"""SELECT date, '__all__' AS package, category, sum(downloads) AS downloads
|
||||
FROM {table} where date = '{date}' GROUP BY date, category"""
|
||||
cursor.execute(aggregate_query, (table,))
|
||||
values = cursor.fetchall()
|
||||
|
||||
delete_query = \
|
||||
f"""DELETE FROM {table}
|
||||
delete_query = f"""DELETE FROM {table}
|
||||
WHERE date = '{date}' and package = '__all__'"""
|
||||
insert_query = \
|
||||
f"""INSERT INTO {table} (date, package, category, downloads)
|
||||
insert_query = f"""INSERT INTO {table} (date, package, category, downloads)
|
||||
VALUES %s"""
|
||||
try:
|
||||
print(delete_query)
|
||||
@@ -189,7 +170,7 @@ def update_all_package_stats(env="dev", date=None):
|
||||
return success
|
||||
|
||||
|
||||
def update_recent_stats(env="dev", date=None):
|
||||
def update_recent_stats(date=None):
|
||||
"""Update daily, weekly, monthly stats for all packages."""
|
||||
print("recent")
|
||||
start = time.time()
|
||||
@@ -197,7 +178,7 @@ def update_recent_stats(env="dev", date=None):
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
connection, cursor = get_connection_cursor()
|
||||
|
||||
downloads_table = "overall"
|
||||
recent_table = "recent"
|
||||
@@ -214,19 +195,16 @@ def update_recent_stats(env="dev", date=None):
|
||||
|
||||
success = {}
|
||||
for period, clause in where.items():
|
||||
select_query = \
|
||||
f"""SELECT package, '{period}' as category, sum(downloads) AS downloads
|
||||
select_query = f"""SELECT package, '{period}' as category, sum(downloads) AS downloads
|
||||
FROM {downloads_table}
|
||||
WHERE category = 'without_mirrors' and {clause}
|
||||
GROUP BY package"""
|
||||
cursor.execute(select_query)
|
||||
values = cursor.fetchall()
|
||||
|
||||
delete_query = \
|
||||
f"""DELETE FROM {recent_table}
|
||||
delete_query = f"""DELETE FROM {recent_table}
|
||||
WHERE category = '{period}'"""
|
||||
insert_query = \
|
||||
f"""INSERT INTO {recent_table}
|
||||
insert_query = f"""INSERT INTO {recent_table}
|
||||
(package, category, downloads) VALUES %s"""
|
||||
try:
|
||||
print(delete_query)
|
||||
@@ -244,7 +222,7 @@ def update_recent_stats(env="dev", date=None):
|
||||
return success
|
||||
|
||||
|
||||
def get_connection_cursor(env):
|
||||
def get_connection_cursor():
|
||||
"""Get a db connection cursor."""
|
||||
connection = psycopg2.connect(
|
||||
dbname=os.environ["POSTGRESQL_DBNAME"],
|
||||
@@ -258,7 +236,7 @@ def get_connection_cursor(env):
|
||||
return connection, cursor
|
||||
|
||||
|
||||
def purge_old_data(env="dev", date=None):
|
||||
def purge_old_data(date=None):
|
||||
"""Purge old data records."""
|
||||
print("Purge")
|
||||
age = MAX_RECORD_AGE
|
||||
@@ -267,11 +245,11 @@ def purge_old_data(env="dev", date=None):
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
connection, cursor = get_connection_cursor()
|
||||
|
||||
date = datetime.datetime.strptime(date, '%Y-%m-%d')
|
||||
date = datetime.datetime.strptime(date, "%Y-%m-%d")
|
||||
purge_date = date - datetime.timedelta(days=age)
|
||||
purge_date = purge_date.strftime('%Y-%m-%d')
|
||||
purge_date = purge_date.strftime("%Y-%m-%d")
|
||||
|
||||
success = {}
|
||||
for table in PSQL_TABLES:
|
||||
@@ -290,9 +268,9 @@ def purge_old_data(env="dev", date=None):
|
||||
return success
|
||||
|
||||
|
||||
def vacuum_analyze(env="dev"):
|
||||
def vacuum_analyze():
|
||||
"""Vacuum and analyze the db."""
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
connection, cursor = get_connection_cursor()
|
||||
connection.set_isolation_level(0)
|
||||
|
||||
results = {}
|
||||
@@ -321,7 +299,7 @@ def get_query(date):
|
||||
FROM
|
||||
`the-psf.pypi.downloads{date.replace("-", "")}`
|
||||
WHERE
|
||||
REGEXP_CONTAINS(details.python,r'^[0-9]+\.[0-9]+.{{0,}}$') OR
|
||||
REGEXP_CONTAINS(details.python,r'^[0-9]\.[0-9]+.{{0,}}$') OR
|
||||
details.python IS NULL )
|
||||
SELECT
|
||||
package,
|
||||
@@ -341,11 +319,7 @@ def get_query(date):
|
||||
SELECT
|
||||
package,
|
||||
'python_minor' AS category_label,
|
||||
cast(CONCAT(SPLIT(python_version, '.')[
|
||||
OFFSET
|
||||
(0)],'.',SPLIT(python_version, '.')[
|
||||
OFFSET
|
||||
(1)]) as string) AS category,
|
||||
REGEXP_EXTRACT(python_version, r'^[0-9]+\.[0-9]+') AS category,
|
||||
COUNT(*) AS downloads
|
||||
FROM
|
||||
dls
|
||||
@@ -398,25 +372,34 @@ def get_query(date):
|
||||
|
||||
|
||||
@celery.task
|
||||
def etl():
|
||||
def etl(date=None, purge=True):
|
||||
"""Perform the stats download."""
|
||||
env = os.environ.get("ENV")
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
results = dict()
|
||||
results["purge"] = purge_old_data(env, date)
|
||||
results["downloads"] = get_daily_download_stats(env, date)
|
||||
results["__all__"] = update_all_package_stats(env, date)
|
||||
results["recent"] = update_recent_stats(env, date)
|
||||
results["cleanup"] = vacuum_analyze(env)
|
||||
results["downloads"] = get_daily_download_stats(date)
|
||||
results["__all__"] = update_all_package_stats(date)
|
||||
results["recent"] = update_recent_stats()
|
||||
results["cleanup"] = vacuum_analyze()
|
||||
if purge:
|
||||
results["purge"] = purge_old_data(date)
|
||||
return results
|
||||
|
||||
|
||||
@celery.task
|
||||
def example(thing):
|
||||
print(thing)
|
||||
print("Sleeping")
|
||||
time.sleep(10)
|
||||
print("done")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
date = "2018-12-23"
|
||||
env = "prod"
|
||||
print(date, env)
|
||||
# print(purge_old_data(env, date))
|
||||
print(get_daily_download_stats(env, date))
|
||||
print(update_all_package_stats(env, date))
|
||||
print(update_recent_stats(env, date))
|
||||
run_date = "2020-01-09"
|
||||
print(run_date)
|
||||
# print(purge_old_data(run_date))
|
||||
# vacuum_analyze()
|
||||
print(get_daily_download_stats(run_date))
|
||||
print(update_all_package_stats(run_date))
|
||||
# print(update_recent_stats(run_date))
|
||||
# vacuum_analyze(env)
|
||||
|
||||
Reference in New Issue
Block a user