Skip to content

Commit

Permalink
Merge pull request #163 from steemit/bugfix
Browse files Browse the repository at this point in the history
hivemind beta
  • Loading branch information
roadscape authored Jan 16, 2019
2 parents 40afe61 + 96571c3 commit f7a4679
Show file tree
Hide file tree
Showing 21 changed files with 613 additions and 231 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ WORKDIR /app

ADD scripts/entrypoint.sh /usr/local/bin/entrypoint.sh
ADD scripts/hivesync.sh /usr/local/bin/hivesync.sh
ADD scripts/hivesynccontinue.sh /usr/local/bin/hivesynccontinue.sh
RUN chmod +x /usr/local/bin/entrypoint.sh
RUN chmod +x /usr/local/bin/hivesync.sh
RUN chmod +x /usr/local/bin/hivesynccontinue.sh

RUN \
pip3 install . && \
Expand Down
6 changes: 6 additions & 0 deletions hive/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ def run():
from hive.db.db_state import DbState
print(DbState.status())

#elif mode == 'sync-profile':
# from hive.indexer.sync import Sync
# from hive.utils.profiler import Profiler
# with Profiler():
# Sync(conf=conf).run()

else:
raise Exception("unknown run mode %s" % mode)

Expand Down
13 changes: 9 additions & 4 deletions hive/conf.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
"""Conf handles reading run-time config and app-level settings."""

import re
import logging
import configargparse

from hive.steem.client import SteemClient
from hive.db.adapter import Db
from hive.utils.normalize import strtobool, int_log_level
from hive.utils.stats import Stats
from hive.utils.stats import DbStats

def _sanitized_conf(parser):
"""Formats parser config, redacting database url password."""
out = parser.format_values()
return re.sub(r'(?<=:)\w+(?=@)', '<redacted>', out)

class Conf():
""" Manages sync/server configuration via args, ENVs, and hive.conf. """
Expand Down Expand Up @@ -51,11 +57,10 @@ def init_argparse(cls, strict=True, **kwargs):
root = logging.getLogger()
root.setLevel(conf.log_level())
root.info("loaded configuration:\n%s",
parser.format_values())
_sanitized_conf(parser))

# for API server, dump SQL report often
if conf.mode() == 'server':
Stats.PRINT_THRESH_MINS = 1
DbStats.SLOW_QUERY_MS = 750

return conf

Expand Down
2 changes: 1 addition & 1 deletion hive/db/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,6 @@ def _is_write_query(sql):
if action == 'SELECT':
return False
if action in ['DELETE', 'UPDATE', 'INSERT', 'COMMIT', 'START',
'ALTER', 'TRUNCA', 'CREATE']:
'ALTER', 'TRUNCA', 'CREATE', 'DROP I']:
return True
raise Exception("unknown action: {}".format(sql))
73 changes: 60 additions & 13 deletions hive/db/db_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import logging

from hive.db.schema import setup, build_metadata, teardown, DB_VERSION
from hive.db.schema import setup, reset_autovac, build_metadata, teardown, DB_VERSION
from hive.db.adapter import Db

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,14 +82,21 @@ def _all_foreign_keys(cls):
@classmethod
def _disableable_indexes(cls):
to_locate = [
'hive_posts_ix1', # (parent_id)
'hive_posts_ix2', # (is_deleted, depth)
'hive_follows_ix2', # (following, follower, state=1)
'hive_follows_ix3', # (follower, following, state=1)
'hive_posts_ix3', # (author, depth, id)
'hive_posts_ix4', # (parent_id, id, is_deleted=0)
'hive_follows_ix5a', # (following, state, created_at, follower)
'hive_follows_ix5b', # (follower, state, created_at, following)
'hive_reblogs_ix1', # (post_id, account, created_at)
'hive_posts_cache_ix6', # (sc_trend, post_id)
'hive_posts_cache_ix7', # (sc_hot, post_id)
'hive_posts_cache_ix6a', # (sc_trend, post_id, paidout=0)
'hive_posts_cache_ix6b', # (post_id, sc_trend, paidout=0)
'hive_posts_cache_ix7a', # (sc_hot, post_id, paidout=0)
'hive_posts_cache_ix7b', # (post_id, sc_hot, paidout=0)
'hive_posts_cache_ix8', # (category, payout, depth, paidout=0)
'hive_posts_cache_ix9a', # (depth, payout, post_id, paidout=0)
'hive_posts_cache_ix9b', # (category, depth, payout, post_id, paidout=0)
'hive_accounts_ix3', # (vote_weight, name VPO)
'hive_accounts_ix4', # (id, name)
'hive_accounts_ix5', # (cached_at, name)
]

to_return = []
Expand Down Expand Up @@ -181,6 +188,7 @@ def _is_feed_cache_empty(cls):
@classmethod
def _check_migrations(cls):
"""Check current migration version and perform updates as needed."""
#pylint: disable=line-too-long
cls._ver = cls.db().query_one("SELECT db_version FROM hive_state LIMIT 1")
assert cls._ver is not None, 'could not load state record'

Expand All @@ -194,15 +202,11 @@ def _check_migrations(cls):
cls._set_ver(3)

if cls._ver == 3:
sql = """CREATE INDEX hive_accounts_ix3 ON hive_accounts
USING btree (vote_weight, name varchar_pattern_ops)"""
cls.db().query(sql)
cls.db().query("CREATE INDEX hive_accounts_ix3 ON hive_accounts (vote_weight, name varchar_pattern_ops)")
cls._set_ver(4)

if cls._ver == 4:
sql = """CREATE INDEX hive_follows_ix4 ON public.hive_follows
USING btree (follower, following) WHERE state = 2;"""
cls.db().query(sql)
cls.db().query("CREATE INDEX hive_follows_ix4 ON hive_follows (follower, following) WHERE state = 2")
cls._set_ver(5)

if cls._ver == 5:
Expand All @@ -215,6 +219,49 @@ def _check_migrations(cls):
Accounts.clear_ids()
cls._set_ver(6)

if cls._ver == 6:
cls.db().query("DROP INDEX hive_posts_cache_ix6")
cls.db().query("CREATE INDEX hive_posts_cache_ix6a ON hive_posts_cache (sc_trend, post_id) WHERE is_paidout = '0'")
cls.db().query("CREATE INDEX hive_posts_cache_ix6b ON hive_posts_cache (post_id, sc_trend) WHERE is_paidout = '0'")
cls.db().query("DROP INDEX hive_posts_cache_ix7")
cls.db().query("CREATE INDEX hive_posts_cache_ix7a ON hive_posts_cache (sc_hot, post_id) WHERE is_paidout = '0'")
cls.db().query("CREATE INDEX hive_posts_cache_ix7b ON hive_posts_cache (post_id, sc_hot) WHERE is_paidout = '0'")
cls._set_ver(7)

if cls._ver == 7:
cls.db().query("CREATE INDEX hive_accounts_ix4 ON hive_accounts (id, name)")
cls.db().query("CREATE INDEX hive_accounts_ix5 ON hive_accounts (cached_at, name)")
cls._set_ver(8)

if cls._ver == 8:
cls.db().query("DROP INDEX hive_follows_ix2")
cls.db().query("DROP INDEX hive_follows_ix3")
cls.db().query("DROP INDEX hive_follows_ix4")
cls.db().query("CREATE INDEX hive_follows_5a ON hive_follows (following, state, created_at, follower)")
cls.db().query("CREATE INDEX hive_follows_5b ON hive_follows (follower, state, created_at, following)")
cls._set_ver(9)

if cls._ver == 9:
from hive.indexer.follow import Follow
Follow.force_recount()
cls._set_ver(10)

if cls._ver == 10:
cls.db().query("CREATE INDEX hive_posts_cache_ix8 ON hive_posts_cache (category, payout, depth) WHERE is_paidout = '0'")
cls.db().query("CREATE INDEX hive_posts_cache_ix9a ON hive_posts_cache (depth, payout, post_id) WHERE is_paidout = '0'")
cls.db().query("CREATE INDEX hive_posts_cache_ix9b ON hive_posts_cache (category, depth, payout, post_id) WHERE is_paidout = '0'")
cls._set_ver(11)

if cls._ver == 11:
cls.db().query("DROP INDEX hive_posts_ix1")
cls.db().query("DROP INDEX hive_posts_ix2")
cls.db().query("CREATE INDEX hive_posts_ix3 ON hive_posts (author, depth, id) WHERE is_deleted = '0'")
cls.db().query("CREATE INDEX hive_posts_ix4 ON hive_posts (parent_id, id) WHERE is_deleted = '0'")
cls._set_ver(12)

reset_autovac(cls.db())

log.info("[HIVE] db version: %d", cls._ver)
assert cls._ver == DB_VERSION, "migration missing or invalid DB_VERSION"
# Example migration:
#if cls._ver == 1:
Expand Down
69 changes: 38 additions & 31 deletions hive/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

#pylint: disable=line-too-long, too-many-lines

DB_VERSION = 6
DB_VERSION = 12

def build_metadata():
"""Build schema def with SqlAlchemy"""
Expand Down Expand Up @@ -64,6 +64,8 @@ def build_metadata():
sa.Index('hive_accounts_ix1', 'vote_weight', 'id'), # core: quick ranks
sa.Index('hive_accounts_ix2', 'name', 'id'), # core: quick id map
sa.Index('hive_accounts_ix3', 'vote_weight', 'name', postgresql_ops=dict(name='varchar_pattern_ops')), # API: lookup
sa.Index('hive_accounts_ix4', 'id', 'name'), # API: quick filter/sort
sa.Index('hive_accounts_ix5', 'cached_at', 'name'), # core/listen sweep
mysql_engine='InnoDB',
mysql_default_charset='utf8mb4'
)
Expand All @@ -88,8 +90,8 @@ def build_metadata():
sa.ForeignKeyConstraint(['community'], ['hive_accounts.name'], name='hive_posts_fk2'),
sa.ForeignKeyConstraint(['parent_id'], ['hive_posts.id'], name='hive_posts_fk3'),
sa.UniqueConstraint('author', 'permlink', name='hive_posts_ux1'),
sa.Index('hive_posts_ix1', 'parent_id'), # API
sa.Index('hive_posts_ix2', 'is_deleted', 'depth'), # API
sa.Index('hive_posts_ix3', 'author', 'depth', 'id', postgresql_where=sql_text("is_deleted = '0'")), # API: author blog/comments
sa.Index('hive_posts_ix4', 'parent_id', 'id', postgresql_where=sql_text("is_deleted = '0'")), # API: fetching children
mysql_engine='InnoDB',
mysql_default_charset='utf8mb4'
)
Expand Down Expand Up @@ -121,9 +123,8 @@ def build_metadata():
sa.Column('created_at', sa.DateTime, nullable=False),

sa.UniqueConstraint('following', 'follower', name='hive_follows_ux3'), # core
sa.Index('hive_follows_ix2', 'following', 'follower', postgresql_where=sql_text("state = 1")), # API
sa.Index('hive_follows_ix3', 'follower', 'following', postgresql_where=sql_text("state = 1")), # API
sa.Index('hive_follows_ix4', 'follower', 'following', postgresql_where=sql_text("state = 2")), # API
sa.Index('hive_follows_ix5a', 'following', 'state', 'created_at', 'follower'),
sa.Index('hive_follows_ix5b', 'follower', 'state', 'created_at', 'following'),
mysql_engine='InnoDB',
mysql_default_charset='utf8mb4'
)
Expand Down Expand Up @@ -281,8 +282,14 @@ def build_metadata():

sa.Index('hive_posts_cache_ix2', 'promoted', postgresql_where=sql_text("is_paidout = '0' AND promoted > 0")), # API
sa.Index('hive_posts_cache_ix3', 'payout_at', 'post_id', postgresql_where=sql_text("is_paidout = '0'")), # core
sa.Index('hive_posts_cache_ix6', 'sc_trend', 'post_id'), # API
sa.Index('hive_posts_cache_ix7', 'sc_hot', 'post_id'), # API
sa.Index('hive_posts_cache_ix6a', 'sc_trend', 'post_id', postgresql_where=sql_text("is_paidout = '0'")), # API: global trending
sa.Index('hive_posts_cache_ix7a', 'sc_hot', 'post_id', postgresql_where=sql_text("is_paidout = '0'")), # API: global hot
sa.Index('hive_posts_cache_ix6b', 'post_id', 'sc_trend', postgresql_where=sql_text("is_paidout = '0'")), # API: filtered trending
sa.Index('hive_posts_cache_ix7b', 'post_id', 'sc_hot', postgresql_where=sql_text("is_paidout = '0'")), # API: filtered hot
sa.Index('hive_posts_cache_ix8', 'category', 'payout', 'depth', postgresql_where=sql_text("is_paidout = '0'")), # API: tag stats
sa.Index('hive_posts_cache_ix9a', 'depth', 'payout', 'post_id', postgresql_where=sql_text("is_paidout = '0'")), # API: payout
sa.Index('hive_posts_cache_ix9b', 'category', 'depth', 'payout', 'post_id', postgresql_where=sql_text("is_paidout = '0'")), # API: filtered payout

mysql_engine='InnoDB',
mysql_default_charset='utf8mb4'
)
Expand Down Expand Up @@ -326,26 +333,26 @@ def setup(db):
db.query(sql)

def reset_autovac(db):
"""Initializes per-table autovacuum/autoanalyze params"""
# consider using scale_factor = 0 with flat thresholds:
# autovacuum_vacuum_threshold, autovacuum_analyze_threshold

autovac_config = {
# default
'hive_accounts': (0.2, 0.1),
'hive_state': (0.2, 0.1),
'hive_reblogs': (0.2, 0.1),
'hive_payments': (0.2, 0.1),
# more aggresive
'hive_posts': (0.010, 0.005),
'hive_post_tags': (0.010, 0.005),
'hive_feed_cache': (0.010, 0.005),
# very aggresive
'hive_posts_cache': (0.0050, 0.0025), # @36M, ~2/day, 3/day (~240k new tuples daily)
'hive_blocks': (0.0100, 0.0014), # @20M, ~1/week, 1/day
'hive_follows': (0.0050, 0.0025)} # @47M, ~1/day, 3/day (~300k new tuples daily)

for table, (vacuum_sf, analyze_sf) in autovac_config.items():
sql = """ALTER TABLE %s SET (autovacuum_vacuum_scale_factor = %s,
autovacuum_analyze_scale_factor = %s)"""
db.query(sql % (table, vacuum_sf, analyze_sf))
"""Initializes/resets per-table autovacuum/autoanalyze params.
We use a scale factor of 0 and specify exact threshold tuple counts,
per-table, in the format (autovacuum_threshold, autoanalyze_threshold)."""

autovac_config = { # vacuum analyze
'hive_accounts': (50000, 100000),
'hive_posts_cache': (25000, 25000),
'hive_posts': (2500, 10000),
'hive_post_tags': (5000, 10000),
'hive_follows': (5000, 5000),
'hive_feed_cache': (5000, 5000),
'hive_blocks': (5000, 25000),
'hive_reblogs': (5000, 5000),
'hive_payments': (5000, 5000),
}

for table, (n_vacuum, n_analyze) in autovac_config.items():
sql = """ALTER TABLE %s SET (autovacuum_vacuum_scale_factor = 0,
autovacuum_vacuum_threshold = %s,
autovacuum_analyze_scale_factor = 0,
autovacuum_analyze_threshold = %s)"""
db.query(sql % (table, n_vacuum, n_analyze))
2 changes: 1 addition & 1 deletion hive/indexer/cached_post.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def _sql(cls, pid, post, level=None):
('is_hidden', "%d" % stats['hide']),
('is_grayed', "%d" % stats['gray']),
('author_rep', "%f" % stats['author_rep']),
('children', "%d" % min(post['children'], 32767)), # TODO: #115
('children', "%d" % min(post['children'], 32767)),
])

# if recounting, update the parent next pass.
Expand Down
4 changes: 4 additions & 0 deletions hive/indexer/custom_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ def _process_legacy(cls, account, op_json, block_date):
@classmethod
def reblog(cls, account, op_json, block_date):
"""Handle legacy 'reblog' op"""
if ('account' not in op_json
or 'author' not in op_json
or 'permlink' not in op_json):
return
blogger = op_json['account']
author = op_json['author']
permlink = op_json['permlink']
Expand Down
20 changes: 12 additions & 8 deletions hive/indexer/follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,24 @@ def force_recount(cls):
log.info("[SYNC] query follower counts")
sql = """
CREATE TEMPORARY TABLE following_counts AS (
SELECT follower account_id, COUNT(*) num
FROM hive_follows GROUP BY follower);
SELECT id account_id, COUNT(state) num
FROM hive_accounts
LEFT JOIN hive_follows hf ON id = hf.follower AND state = 1
GROUP BY id);
CREATE TEMPORARY TABLE follower_counts AS (
SELECT following account_id, COUNT(*) num
FROM hive_follows GROUP BY following);
SELECT id account_id, COUNT(state) num
FROM hive_accounts
LEFT JOIN hive_follows hf ON id = hf.following AND state = 1
GROUP BY id);
"""
DB.query(sql)

log.info("[SYNC] update follower counts")
sql = """
UPDATE hive_accounts SET followers = num
FROM follower_counts WHERE id = account_id;
UPDATE hive_accounts SET followers = num FROM follower_counts
WHERE id = account_id AND followers != num;
UPDATE hive_accounts SET following = num
FROM following_counts WHERE id = account_id;
UPDATE hive_accounts SET following = num FROM following_counts
WHERE id = account_id AND following != num;
"""
DB.query(sql)
Loading

0 comments on commit f7a4679

Please sign in to comment.