mirror of
https://github.com/LukeHagar/pypistats.org.git
synced 2025-12-06 12:47:48 +00:00
single docker container using supervisord
This commit is contained in:
@@ -1,30 +1,17 @@
|
||||
"""Get the download stats for a specific day."""
|
||||
import datetime
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
|
||||
from google.auth.crypt._python_rsa import RSASigner
|
||||
from google.cloud import bigquery
|
||||
from google.oauth2.service_account import Credentials
|
||||
import pandas as pd
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
from pypistats.run import celery
|
||||
|
||||
|
||||
# For local use.
|
||||
def load_env_vars(env="dev"):
|
||||
"""Load environment variables."""
|
||||
local_path = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||||
"secret",
|
||||
f"env_vars_{env}.json")
|
||||
for key, value in json.load(open(local_path, 'r')).items():
|
||||
os.environ[key] = value
|
||||
|
||||
|
||||
# Mirrors to disregard when considering downloads
|
||||
MIRRORS = ("bandersnatch", "z3c.pypimirror", "Artifactory", "devpi")
|
||||
|
||||
@@ -64,10 +51,7 @@ def get_google_credentials():
|
||||
def get_daily_download_stats(env="dev", date=None):
|
||||
"""Get daily download stats for pypi packages from BigQuery."""
|
||||
start = time.time()
|
||||
if os.environ.get("ENV", None) is None:
|
||||
load_env_vars(env)
|
||||
else:
|
||||
env = os.environ.get("ENV")
|
||||
|
||||
job_config = bigquery.QueryJobConfig()
|
||||
credentials = get_google_credentials()
|
||||
bq_client = bigquery.Client(
|
||||
@@ -86,57 +70,45 @@ def get_daily_download_stats(env="dev", date=None):
|
||||
rows = list(iterator)
|
||||
print(len(rows), "rows from gbq")
|
||||
|
||||
data = []
|
||||
data = {}
|
||||
for row in rows:
|
||||
data.append((
|
||||
if row["category_label"] not in data:
|
||||
data[row["category_label"]] = []
|
||||
data[row["category_label"]].append([
|
||||
date,
|
||||
row['package'],
|
||||
row['category_label'],
|
||||
row['category'],
|
||||
row['downloads']
|
||||
))
|
||||
row["package"],
|
||||
row["category"],
|
||||
row["downloads"],
|
||||
])
|
||||
|
||||
df = pd.DataFrame(data, columns=[
|
||||
"date",
|
||||
"package",
|
||||
"category_label",
|
||||
"category",
|
||||
"downloads",
|
||||
])
|
||||
|
||||
results = update_db(df, env)
|
||||
results = update_db(data, env)
|
||||
print("Elapsed: " + str(time.time() - start))
|
||||
results["elapsed"] = time.time() - start
|
||||
return results
|
||||
|
||||
|
||||
def update_db(df, env="dev"):
|
||||
def update_db(data, env="dev"):
|
||||
"""Update the db with new data by table."""
|
||||
load_env_vars(env)
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
|
||||
df_groups = df.groupby("category_label")
|
||||
|
||||
success = {}
|
||||
for category_label, df_category in df_groups:
|
||||
for category_label, rows in data.items():
|
||||
table = category_label
|
||||
df_category = df_category[[
|
||||
"date",
|
||||
"package",
|
||||
"category",
|
||||
"downloads",
|
||||
]]
|
||||
success[table] = update_table(
|
||||
connection, cursor, table, df_category, date
|
||||
connection, cursor, table, rows, date
|
||||
)
|
||||
|
||||
return success
|
||||
|
||||
|
||||
def update_table(connection, cursor, table, df, date):
|
||||
def update_table(connection, cursor, table, rows, date):
|
||||
"""Update a table."""
|
||||
print(table)
|
||||
df = df.fillna("null")
|
||||
|
||||
for row in rows:
|
||||
for idx, item in enumerate(row):
|
||||
if item is None:
|
||||
row[idx] = "null"
|
||||
|
||||
delete_query = \
|
||||
f"""DELETE FROM {table}
|
||||
@@ -144,12 +116,12 @@ def update_table(connection, cursor, table, df, date):
|
||||
insert_query = \
|
||||
f"""INSERT INTO {table} (date, package, category, downloads)
|
||||
VALUES %s"""
|
||||
values = list(df.itertuples(index=False, name=None))
|
||||
|
||||
try:
|
||||
print(delete_query)
|
||||
cursor.execute(delete_query)
|
||||
print(insert_query)
|
||||
execute_values(cursor, insert_query, values)
|
||||
execute_values(cursor, insert_query, rows)
|
||||
connection.commit()
|
||||
return True
|
||||
except psycopg2.IntegrityError as e:
|
||||
@@ -165,11 +137,6 @@ def update_all_package_stats(env="dev", date=None):
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
|
||||
if os.environ.get("ENV", None) is None:
|
||||
load_env_vars(env)
|
||||
else:
|
||||
env = os.environ.get("ENV")
|
||||
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
|
||||
success = {}
|
||||
@@ -210,11 +177,6 @@ def update_recent_stats(env="dev", date=None):
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
|
||||
if os.environ.get("ENV", None) is None:
|
||||
load_env_vars(env)
|
||||
else:
|
||||
env = os.environ.get("ENV")
|
||||
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
|
||||
downloads_table = "overall"
|
||||
@@ -285,11 +247,6 @@ def purge_old_data(env="dev", date=None):
|
||||
if date is None:
|
||||
date = str(datetime.date.today() - datetime.timedelta(days=1))
|
||||
|
||||
if os.environ.get("ENV", None) is None:
|
||||
load_env_vars(env)
|
||||
else:
|
||||
env = os.environ.get("ENV")
|
||||
|
||||
connection, cursor = get_connection_cursor(env)
|
||||
|
||||
date = datetime.datetime.strptime(date, '%Y-%m-%d')
|
||||
@@ -416,14 +373,8 @@ def etl():
|
||||
return results
|
||||
|
||||
|
||||
@celery.task
|
||||
def test():
|
||||
"""Test task for celery/beat."""
|
||||
print("hello test.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
date = "2018-04-19"
|
||||
date = "2018-04-23"
|
||||
env = "prod"
|
||||
print(date, env)
|
||||
print(get_daily_download_stats(env, date))
|
||||
|
||||
Reference in New Issue
Block a user