diff --git a/api/dbv1/get_users.sql.go b/api/dbv1/get_users.sql.go index 5824eb07..ec92a410 100644 --- a/api/dbv1/get_users.sql.go +++ b/api/dbv1/get_users.sql.go @@ -15,57 +15,6 @@ import ( ) const getUsers = `-- name: GetUsers :many -WITH input_users AS ( - SELECT DISTINCT unnest($2::int[])::int AS user_id -), -current_user_followees AS ( - SELECT followee_user_id - FROM follows - WHERE $1 > 0 - AND follower_user_id = $1 - AND is_delete = false -), -current_user_following_targets AS ( - SELECT f.followee_user_id AS user_id - FROM follows f - JOIN input_users i ON i.user_id = f.followee_user_id - WHERE $1 > 0 - AND f.follower_user_id = $1 - AND f.is_delete = false - GROUP BY f.followee_user_id -), -current_user_subscribed_targets AS ( - SELECT s.user_id - FROM subscriptions s - JOIN input_users i ON i.user_id = s.user_id - WHERE $1 > 0 - AND s.subscriber_id = $1 - AND s.is_delete = false - GROUP BY s.user_id -), -targets_following_current_user AS ( - SELECT f.follower_user_id AS user_id - FROM follows f - JOIN input_users i ON i.user_id = f.follower_user_id - WHERE $1 > 0 - AND f.followee_user_id = $1 - AND f.is_delete = false - GROUP BY f.follower_user_id -), -current_user_followee_follow_counts AS ( - SELECT target.user_id, count(*)::bigint AS count - FROM current_user_followees mf - JOIN LATERAL ( - SELECT f.followee_user_id AS user_id - FROM follows f - WHERE f.follower_user_id = mf.followee_user_id - AND f.followee_user_id = ANY($2::int[]) - AND f.followee_user_id != $1 - AND f.is_delete = false - OFFSET 0 - ) target ON TRUE - GROUP BY target.user_id -) SELECT album_count, artist_pick_track_id, @@ -155,17 +104,10 @@ SELECT is_storage_v2, creator_node_endpoint, - -- "Of the people I follow, how many also follow this user?" - -- - -- Compute viewer relationship state once for the whole @ids batch. The - -- previous correlated shape repeated the mutual-follow count for every - -- returned user, which multiplied the viewer followee scan by result size. - -- The CTE above drives from the viewer's followees once and probes the - -- target batch via the fanout index. - COALESCE(current_user_followee_follow_counts.count, 0)::bigint AS current_user_followee_follow_count, - (current_user_following_targets.user_id IS NOT NULL)::bool AS does_current_user_follow, - (current_user_subscribed_targets.user_id IS NOT NULL)::bool AS does_current_user_subscribe, - (targets_following_current_user.user_id IS NOT NULL)::bool AS does_follow_current_user, + 0::bigint AS current_user_followee_follow_count, + false::bool AS does_current_user_follow, + false::bool AS does_current_user_subscribe, + false::bool AS does_follow_current_user, handle_lc, u.updated_at, @@ -243,14 +185,6 @@ LEFT JOIN sol_claimable_accounts spl_user_bank LEFT JOIN sol_claimable_accounts usdc_user_bank ON usdc_user_bank.ethereum_address = u.wallet AND usdc_user_bank.mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v' -- USDC -LEFT JOIN current_user_followee_follow_counts - ON current_user_followee_follow_counts.user_id = u.user_id -LEFT JOIN current_user_following_targets - ON current_user_following_targets.user_id = u.user_id -LEFT JOIN current_user_subscribed_targets - ON current_user_subscribed_targets.user_id = u.user_id -LEFT JOIN targets_following_current_user - ON targets_following_current_user.user_id = u.user_id WHERE u.user_id = ANY($2::int[]) ORDER BY u.user_id ` diff --git a/api/dbv1/models.go b/api/dbv1/models.go index c3c637ec..4cbdc75e 100644 --- a/api/dbv1/models.go +++ b/api/dbv1/models.go @@ -2654,6 +2654,20 @@ type UserScoreFeature struct { UpdatedAt pgtype.Timestamptz `json:"updated_at"` } +type UserSocialSet struct { + UserID int32 `json:"user_id"` + FolloweesBitmap []byte `json:"followees_bitmap"` + FollowersBitmap []byte `json:"followers_bitmap"` + UpdatedAt time.Time `json:"updated_at"` +} + +type UserSocialSetDirty struct { + UserID int32 `json:"user_id"` + FolloweesDirty bool `json:"followees_dirty"` + FollowersDirty bool `json:"followers_dirty"` + UpdatedAt time.Time `json:"updated_at"` +} + type UserTip struct { Slot int32 `json:"slot"` Signature string `json:"signature"` diff --git a/api/dbv1/queries/get_users.sql b/api/dbv1/queries/get_users.sql index 468a76b8..5c3e196e 100644 --- a/api/dbv1/queries/get_users.sql +++ b/api/dbv1/queries/get_users.sql @@ -1,55 +1,4 @@ -- name: GetUsers :many -WITH input_users AS ( - SELECT DISTINCT unnest(@ids::int[])::int AS user_id -), -current_user_followees AS ( - SELECT followee_user_id - FROM follows - WHERE @my_id > 0 - AND follower_user_id = @my_id - AND is_delete = false -), -current_user_following_targets AS ( - SELECT f.followee_user_id AS user_id - FROM follows f - JOIN input_users i ON i.user_id = f.followee_user_id - WHERE @my_id > 0 - AND f.follower_user_id = @my_id - AND f.is_delete = false - GROUP BY f.followee_user_id -), -current_user_subscribed_targets AS ( - SELECT s.user_id - FROM subscriptions s - JOIN input_users i ON i.user_id = s.user_id - WHERE @my_id > 0 - AND s.subscriber_id = @my_id - AND s.is_delete = false - GROUP BY s.user_id -), -targets_following_current_user AS ( - SELECT f.follower_user_id AS user_id - FROM follows f - JOIN input_users i ON i.user_id = f.follower_user_id - WHERE @my_id > 0 - AND f.followee_user_id = @my_id - AND f.is_delete = false - GROUP BY f.follower_user_id -), -current_user_followee_follow_counts AS ( - SELECT target.user_id, count(*)::bigint AS count - FROM current_user_followees mf - JOIN LATERAL ( - SELECT f.followee_user_id AS user_id - FROM follows f - WHERE f.follower_user_id = mf.followee_user_id - AND f.followee_user_id = ANY(@ids::int[]) - AND f.followee_user_id != @my_id - AND f.is_delete = false - OFFSET 0 - ) target ON TRUE - GROUP BY target.user_id -) SELECT album_count, artist_pick_track_id, @@ -139,17 +88,10 @@ SELECT is_storage_v2, creator_node_endpoint, - -- "Of the people I follow, how many also follow this user?" - -- - -- Compute viewer relationship state once for the whole @ids batch. The - -- previous correlated shape repeated the mutual-follow count for every - -- returned user, which multiplied the viewer followee scan by result size. - -- The CTE above drives from the viewer's followees once and probes the - -- target batch via the fanout index. - COALESCE(current_user_followee_follow_counts.count, 0)::bigint AS current_user_followee_follow_count, - (current_user_following_targets.user_id IS NOT NULL)::bool AS does_current_user_follow, - (current_user_subscribed_targets.user_id IS NOT NULL)::bool AS does_current_user_subscribe, - (targets_following_current_user.user_id IS NOT NULL)::bool AS does_follow_current_user, + 0::bigint AS current_user_followee_follow_count, + false::bool AS does_current_user_follow, + false::bool AS does_current_user_subscribe, + false::bool AS does_follow_current_user, handle_lc, u.updated_at, @@ -230,14 +172,6 @@ LEFT JOIN sol_claimable_accounts spl_user_bank LEFT JOIN sol_claimable_accounts usdc_user_bank ON usdc_user_bank.ethereum_address = u.wallet AND usdc_user_bank.mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v' -- USDC -LEFT JOIN current_user_followee_follow_counts - ON current_user_followee_follow_counts.user_id = u.user_id -LEFT JOIN current_user_following_targets - ON current_user_following_targets.user_id = u.user_id -LEFT JOIN current_user_subscribed_targets - ON current_user_subscribed_targets.user_id = u.user_id -LEFT JOIN targets_following_current_user - ON targets_following_current_user.user_id = u.user_id WHERE u.user_id = ANY(@ids::int[]) ORDER BY u.user_id ; diff --git a/api/dbv1/social_sets.go b/api/dbv1/social_sets.go new file mode 100644 index 00000000..8df78736 --- /dev/null +++ b/api/dbv1/social_sets.go @@ -0,0 +1,260 @@ +package dbv1 + +import ( + "context" + "errors" + "sort" + + "github.com/RoaringBitmap/roaring" + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +func (q *Queries) hydrateCurrentUserRelationships(ctx context.Context, myID int32, users map[int32]User) error { + if myID <= 0 || len(users) == 0 { + return nil + } + + targetIDs := make([]int32, 0, len(users)) + for id := range users { + targetIDs = append(targetIDs, id) + } + sort.Slice(targetIDs, func(i, j int) bool { return targetIDs[i] < targetIDs[j] }) + + myFollowees, err := q.loadFolloweesBitmap(ctx, myID) + if err != nil { + return err + } + + targetFollowers, err := q.loadFollowerBitmaps(ctx, targetIDs) + if err != nil { + return err + } + + subscribed, err := q.loadSubscribedTargetIDs(ctx, myID, targetIDs) + if err != nil { + return err + } + + for _, id := range targetIDs { + user := users[id] + followers := targetFollowers[id] + user.DoesCurrentUserFollow = myFollowees.Contains(uint32(id)) + user.DoesCurrentUserSubscribe = subscribed[id] + user.DoesFollowCurrentUser = followers.Contains(uint32(myID)) + if id != myID { + user.CurrentUserFolloweeFollowCount = int64(roaring.And(myFollowees, followers).GetCardinality()) + } + users[id] = user + } + + return nil +} + +func (q *Queries) loadFolloweesBitmap(ctx context.Context, userID int32) (*roaring.Bitmap, error) { + var data []byte + err := q.db.QueryRow(ctx, ` + SELECT followees_bitmap + FROM user_social_sets + WHERE user_id = $1 + `, userID).Scan(&data) + if err == nil { + bitmap, err := decodeSocialBitmap(data) + if err == nil { + return bitmap, nil + } + } + if err != nil && !errors.Is(err, pgx.ErrNoRows) && !isUndefinedTable(err) { + return nil, err + } + return q.loadFolloweesBitmapFromFollows(ctx, userID) +} + +func (q *Queries) loadFollowerBitmaps(ctx context.Context, userIDs []int32) (map[int32]*roaring.Bitmap, error) { + bitmaps := make(map[int32]*roaring.Bitmap, len(userIDs)) + loaded := make(map[int32]bool, len(userIDs)) + for _, userID := range userIDs { + bitmaps[userID] = roaring.NewBitmap() + } + + rows, err := q.db.Query(ctx, ` + SELECT user_id, followers_bitmap + FROM user_social_sets + WHERE user_id = ANY($1::int[]) + `, userIDs) + if err != nil { + if isUndefinedTable(err) { + return q.loadFollowerBitmapsFromFollows(ctx, userIDs) + } + return nil, err + } + defer rows.Close() + + for rows.Next() { + var userID int32 + var data []byte + if err := rows.Scan(&userID, &data); err != nil { + return nil, err + } + bitmap, err := decodeSocialBitmap(data) + if err != nil { + continue + } + bitmaps[userID] = bitmap + loaded[userID] = true + } + if err := rows.Err(); err != nil { + return nil, err + } + + missing := make([]int32, 0) + for _, userID := range userIDs { + if !loaded[userID] { + missing = append(missing, userID) + } + } + if len(missing) == 0 { + return bitmaps, nil + } + + rawBitmaps, err := q.loadFollowerBitmapsFromFollows(ctx, missing) + if err != nil { + return nil, err + } + for userID, bitmap := range rawBitmaps { + bitmaps[userID] = bitmap + } + + return bitmaps, nil +} + +func (q *Queries) loadFolloweesBitmapFromFollows(ctx context.Context, userID int32) (*roaring.Bitmap, error) { + rows, err := q.db.Query(ctx, ` + SELECT followee_user_id + FROM follows + WHERE follower_user_id = $1 + AND is_delete = false + `, userID) + if err != nil { + return nil, err + } + defer rows.Close() + + bitmap := roaring.NewBitmap() + for rows.Next() { + var followeeID int32 + if err := rows.Scan(&followeeID); err != nil { + return nil, err + } + bitmap.Add(uint32(followeeID)) + } + return bitmap, rows.Err() +} + +func (q *Queries) loadFollowerBitmapsFromFollows(ctx context.Context, userIDs []int32) (map[int32]*roaring.Bitmap, error) { + bitmaps := make(map[int32]*roaring.Bitmap, len(userIDs)) + for _, userID := range userIDs { + bitmaps[userID] = roaring.NewBitmap() + } + + rows, err := q.db.Query(ctx, ` + SELECT followee_user_id, follower_user_id + FROM follows + WHERE followee_user_id = ANY($1::int[]) + AND is_delete = false + `, userIDs) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var followeeID int32 + var followerID int32 + if err := rows.Scan(&followeeID, &followerID); err != nil { + return nil, err + } + if bitmap, ok := bitmaps[followeeID]; ok { + bitmap.Add(uint32(followerID)) + } + } + return bitmaps, rows.Err() +} + +func (q *Queries) loadSubscribedTargetIDs(ctx context.Context, myID int32, targetIDs []int32) (map[int32]bool, error) { + rows, err := q.db.Query(ctx, ` + SELECT user_id + FROM subscriptions + WHERE subscriber_id = $1 + AND user_id = ANY($2::int[]) + AND is_delete = false + GROUP BY user_id + `, myID, targetIDs) + if err != nil { + return nil, err + } + defer rows.Close() + + subscribed := make(map[int32]bool) + for rows.Next() { + var userID int32 + if err := rows.Scan(&userID); err != nil { + return nil, err + } + subscribed[userID] = true + } + return subscribed, rows.Err() +} + +func (q *Queries) RebuildUserSocialSet(ctx context.Context, userID int32) error { + followees, err := q.loadFolloweesBitmapFromFollows(ctx, userID) + if err != nil { + return err + } + + followerSets, err := q.loadFollowerBitmapsFromFollows(ctx, []int32{userID}) + if err != nil { + return err + } + + followeesData, err := encodeSocialBitmap(followees) + if err != nil { + return err + } + followersData, err := encodeSocialBitmap(followerSets[userID]) + if err != nil { + return err + } + + _, err = q.db.Exec(ctx, ` + INSERT INTO user_social_sets (user_id, followees_bitmap, followers_bitmap, updated_at) + VALUES ($1, $2, $3, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_bitmap = EXCLUDED.followees_bitmap, + followers_bitmap = EXCLUDED.followers_bitmap, + updated_at = CURRENT_TIMESTAMP + `, userID, followeesData, followersData) + return err +} + +func decodeSocialBitmap(data []byte) (*roaring.Bitmap, error) { + bitmap := roaring.NewBitmap() + if len(data) == 0 { + return bitmap, nil + } + _, err := bitmap.FromBuffer(data) + return bitmap, err +} + +func encodeSocialBitmap(bitmap *roaring.Bitmap) ([]byte, error) { + if bitmap == nil { + return roaring.NewBitmap().ToBytes() + } + return bitmap.ToBytes() +} + +func isUndefinedTable(err error) bool { + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && pgErr.Code == pgerrcode.UndefinedTable +} diff --git a/api/dbv1/users.go b/api/dbv1/users.go index b58c2ddf..1797a9a4 100644 --- a/api/dbv1/users.go +++ b/api/dbv1/users.go @@ -89,6 +89,10 @@ func (q *Queries) UsersKeyed(ctx context.Context, arg GetUsersParams) (map[int32 } } + if err := q.hydrateCurrentUserRelationships(ctx, arg.MyID, userMap); err != nil { + return nil, err + } + return userMap, nil } @@ -133,4 +137,3 @@ func squareImageStruct(maybeCids ...pgtype.Text) *SquareImage { Mirrors: rest, } } - diff --git a/api/v1_users_test.go b/api/v1_users_test.go index 8b3cd320..98490b13 100644 --- a/api/v1_users_test.go +++ b/api/v1_users_test.go @@ -5,6 +5,7 @@ import ( "testing" "api.audius.co/api/dbv1" + "github.com/RoaringBitmap/roaring" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -87,6 +88,91 @@ func TestUserQuery(t *testing.T) { } } +func TestUserQueryUsesSocialSetSnapshots(t *testing.T) { + app := testAppWithFixtures(t) + + createUserSocialSetsTable(t, app) + + myFollowees := testBitmapBytes(t, 99) + targetFollowers := testBitmapBytes(t, 99) + empty := testBitmapBytes(t) + + _, err := app.pool.Exec(t.Context(), ` + INSERT INTO user_social_sets (user_id, followees_bitmap, followers_bitmap) + VALUES + (1, $1, $3), + (3, $3, $2) + ON CONFLICT (user_id) DO UPDATE SET + followees_bitmap = EXCLUDED.followees_bitmap, + followers_bitmap = EXCLUDED.followers_bitmap + `, myFollowees, targetFollowers, empty) + require.NoError(t, err) + + users, err := app.queries.Users(t.Context(), dbv1.GetUsersParams{ + MyID: 1, + Ids: []int32{3}, + }) + require.NoError(t, err) + require.Len(t, users, 1) + + assert.False(t, users[0].DoesCurrentUserFollow) + assert.False(t, users[0].DoesFollowCurrentUser) + assert.Equal(t, int64(1), users[0].CurrentUserFolloweeFollowCount) +} + +func TestUserQuerySocialSetSnapshotsMatchFollows(t *testing.T) { + app := testAppWithFixtures(t) + params := dbv1.GetUsersParams{ + MyID: 1, + Ids: []int32{1, 2, 3}, + } + + rawUsers, err := app.queries.Users(t.Context(), params) + require.NoError(t, err) + + createUserSocialSetsTable(t, app) + for _, userID := range params.Ids { + require.NoError(t, app.queries.RebuildUserSocialSet(t.Context(), userID)) + } + + snapshotUsers, err := app.queries.Users(t.Context(), params) + require.NoError(t, err) + require.Len(t, snapshotUsers, len(rawUsers)) + + for i := range rawUsers { + assert.Equal(t, rawUsers[i].UserID, snapshotUsers[i].UserID) + assert.Equal(t, rawUsers[i].DoesCurrentUserFollow, snapshotUsers[i].DoesCurrentUserFollow) + assert.Equal(t, rawUsers[i].DoesCurrentUserSubscribe, snapshotUsers[i].DoesCurrentUserSubscribe) + assert.Equal(t, rawUsers[i].DoesFollowCurrentUser, snapshotUsers[i].DoesFollowCurrentUser) + assert.Equal(t, rawUsers[i].CurrentUserFolloweeFollowCount, snapshotUsers[i].CurrentUserFolloweeFollowCount) + } +} + +func createUserSocialSetsTable(t *testing.T, app *ApiServer) { + t.Helper() + + _, err := app.pool.Exec(t.Context(), ` + CREATE TABLE IF NOT EXISTS user_social_sets ( + user_id integer PRIMARY KEY, + followees_bitmap bytea NOT NULL DEFAULT '\x'::bytea, + followers_bitmap bytea NOT NULL DEFAULT '\x'::bytea, + updated_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `) + require.NoError(t, err) +} + +func testBitmapBytes(t *testing.T, ids ...uint32) []byte { + t.Helper() + bitmap := roaring.NewBitmap() + for _, id := range ids { + bitmap.Add(id) + } + data, err := bitmap.ToBytes() + require.NoError(t, err) + return data +} + func TestGetUsers(t *testing.T) { app := testAppWithFixtures(t) var userResponse struct { diff --git a/ddl/migrations/0208_user_social_sets.sql b/ddl/migrations/0208_user_social_sets.sql new file mode 100644 index 00000000..ceea7fa6 --- /dev/null +++ b/ddl/migrations/0208_user_social_sets.sql @@ -0,0 +1,90 @@ +CREATE TABLE IF NOT EXISTS user_social_sets ( + user_id integer PRIMARY KEY, + followees_bitmap bytea NOT NULL DEFAULT '\x'::bytea, + followers_bitmap bytea NOT NULL DEFAULT '\x'::bytea, + updated_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS user_social_set_dirty ( + user_id integer PRIMARY KEY, + followees_dirty boolean NOT NULL DEFAULT false, + followers_dirty boolean NOT NULL DEFAULT false, + updated_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS user_social_set_dirty_updated_at_idx + ON user_social_set_dirty (updated_at, user_id); + +CREATE OR REPLACE FUNCTION mark_user_social_set_dirty() RETURNS trigger AS $$ +BEGIN + IF TG_OP = 'DELETE' THEN + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.follower_user_id, true, false, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.followee_user_id, false, true, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + RETURN OLD; + END IF; + + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (NEW.follower_user_id, true, false, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (NEW.followee_user_id, false, true, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + IF TG_OP = 'UPDATE' THEN + IF OLD.follower_user_id IS DISTINCT FROM NEW.follower_user_id THEN + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.follower_user_id, true, false, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + END IF; + + IF OLD.followee_user_id IS DISTINCT FROM NEW.followee_user_id THEN + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.followee_user_id, false, true, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + END IF; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS mark_user_social_set_dirty ON follows; +CREATE TRIGGER mark_user_social_set_dirty +AFTER INSERT OR UPDATE OR DELETE ON follows +FOR EACH ROW EXECUTE FUNCTION mark_user_social_set_dirty(); + +INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) +SELECT u.user_id, au.following_count > 0, au.follower_count > 0, CURRENT_TIMESTAMP +FROM users u +JOIN aggregate_user au USING (user_id) +WHERE u.is_current = TRUE + AND (au.following_count > 0 OR au.follower_count > 0) +ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; diff --git a/go.mod b/go.mod index 87c28e11..7bbc05dc 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5 github.com/OpenAudio/go-openaudio v1.3.1-0.20260601200759-f6b56f1a737e github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260601200759-f6b56f1a737e + github.com/RoaringBitmap/roaring v1.9.4 github.com/aquasecurity/esquery v0.2.0 github.com/axiomhq/axiom-go v0.23.0 github.com/axiomhq/hyperloglog v0.2.5 @@ -164,6 +165,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/multiformats/go-base32 v0.0.3 // indirect github.com/multiformats/go-base36 v0.1.0 // indirect github.com/multiformats/go-multibase v0.0.3 // indirect diff --git a/go.sum b/go.sum index baae113f..dda4db49 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/OpenAudio/go-openaudio v1.3.1-0.20260601200759-f6b56f1a737e h1:5BmK2S github.com/OpenAudio/go-openaudio v1.3.1-0.20260601200759-f6b56f1a737e/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260601200759-f6b56f1a737e h1:woxembtT6moaH0oQeFnCxJRzTzSTVTsqbd+7H+yfCLE= github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260601200759-f6b56f1a737e/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= +github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= +github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= @@ -51,6 +53,7 @@ github.com/bdragon300/tusgo v0.1.2/go.mod h1:C+JEnr9Mg5aMyd9hCQx6y8DGzbf2gXd3tF6 github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bitset v1.17.0 h1:1X2TS7aHz1ELcC0yU1y2stUs/0ig5oMU6STFZGrhvHI= github.com/bits-and-blooms/bitset v1.17.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/blendle/zapdriver v1.3.1 h1:C3dydBOWYRiOk+B8X9IVZ5IOe+7cl+tGOexN4QqHfpE= @@ -441,6 +444,8 @@ github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1/go.mod h github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4= diff --git a/indexer/indexer.go b/indexer/indexer.go index fef46e02..673a3056 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -157,6 +157,9 @@ func (ci *CoreIndexer) startParityJobs(ctx context.Context) { jobs.NewUpdateDelistStatusesJob(ci.Config, ci.pool). ScheduleEvery(ctx, 5*time.Minute) + jobs.NewUserSocialSetsJob(ci.Config, ci.pool). + ScheduleEvery(ctx, 10*time.Second) + // Reconcile derived challenge state from source tables. Per-challenge // scanners live in api/jobs/challenges/. jobs.NewIndexChallengesJob(ci.Config, ci.pool). diff --git a/jobs/user_social_sets.go b/jobs/user_social_sets.go new file mode 100644 index 00000000..9e3678d7 --- /dev/null +++ b/jobs/user_social_sets.go @@ -0,0 +1,178 @@ +package jobs + +import ( + "context" + "fmt" + "sync" + "time" + + dbv1 "api.audius.co/api/dbv1" + "api.audius.co/config" + "api.audius.co/database" + "api.audius.co/logging" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +type UserSocialSetsJob struct { + pool database.DbPool + logger *zap.Logger + batchSize int + + mutex sync.Mutex + isRunning bool +} + +const UserSocialSetsBatchSize = 50 + +type dirtySocialSet struct { + UserID int32 + UpdatedAt time.Time +} + +func NewUserSocialSetsJob(cfg config.Config, pool database.DbPool) *UserSocialSetsJob { + return &UserSocialSetsJob{ + pool: pool, + logger: logging.NewZapLogger(cfg).Named("UserSocialSetsJob"), + batchSize: UserSocialSetsBatchSize, + } +} + +func (j *UserSocialSetsJob) WithBatchSize(n int) *UserSocialSetsJob { + j.batchSize = n + return j +} + +func (j *UserSocialSetsJob) ScheduleEvery(ctx context.Context, interval time.Duration) *UserSocialSetsJob { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + j.Run(ctx) + case <-ctx.Done(): + j.logger.Info("Job shutting down") + return + } + } + }() + return j +} + +func (j *UserSocialSetsJob) Run(ctx context.Context) { + if err := j.run(ctx); err != nil { + j.logger.Error("Job run failed", zap.Error(err)) + } +} + +func (j *UserSocialSetsJob) run(ctx context.Context) error { + j.mutex.Lock() + if j.isRunning { + j.mutex.Unlock() + return fmt.Errorf("job is already running") + } + j.isRunning = true + j.mutex.Unlock() + defer func() { + j.mutex.Lock() + j.isRunning = false + j.mutex.Unlock() + }() + + selected, err := j.selectDirty(ctx) + if err != nil { + return err + } + if len(selected) < j.batchSize { + missing, err := j.selectMissing(ctx, j.batchSize-len(selected)) + if err != nil { + return err + } + selected = append(selected, missing...) + } + if len(selected) == 0 { + return nil + } + + queries := dbv1.New(j.pool) + rebuilt := 0 + for _, row := range selected { + if err := queries.RebuildUserSocialSet(ctx, row.UserID); err != nil { + return fmt.Errorf("rebuild social set for user %d: %w", row.UserID, err) + } + rebuilt++ + if !row.UpdatedAt.IsZero() { + if _, err := j.pool.Exec(ctx, ` + DELETE FROM user_social_set_dirty + WHERE user_id = @user_id + AND updated_at <= @updated_at + `, pgx.NamedArgs{ + "user_id": row.UserID, + "updated_at": row.UpdatedAt, + }); err != nil { + return err + } + } + } + + j.logger.Info("Rebuilt user social sets", zap.Int("count", rebuilt)) + return nil +} + +func (j *UserSocialSetsJob) selectDirty(ctx context.Context) ([]dirtySocialSet, error) { + rows, err := j.pool.Query(ctx, ` + SELECT user_id, updated_at + FROM user_social_set_dirty + ORDER BY updated_at ASC + LIMIT @limit + `, pgx.NamedArgs{"limit": j.batchSize}) + if err != nil { + return nil, err + } + defer rows.Close() + + selected := []dirtySocialSet{} + for rows.Next() { + var row dirtySocialSet + if err := rows.Scan(&row.UserID, &row.UpdatedAt); err != nil { + return nil, err + } + selected = append(selected, row) + } + return selected, rows.Err() +} + +func (j *UserSocialSetsJob) selectMissing(ctx context.Context, limit int) ([]dirtySocialSet, error) { + if limit <= 0 { + return nil, nil + } + + rows, err := j.pool.Query(ctx, ` + SELECT u.user_id + FROM users u + JOIN aggregate_user au USING (user_id) + LEFT JOIN user_social_sets uss USING (user_id) + LEFT JOIN user_social_set_dirty dirty USING (user_id) + WHERE u.is_current = true + AND uss.user_id IS NULL + AND dirty.user_id IS NULL + AND (au.following_count > 0 OR au.follower_count > 0) + ORDER BY u.user_id + LIMIT @limit + `, pgx.NamedArgs{"limit": limit}) + if err != nil { + return nil, err + } + defer rows.Close() + + selected := []dirtySocialSet{} + for rows.Next() { + var userID int32 + if err := rows.Scan(&userID); err != nil { + return nil, err + } + selected = append(selected, dirtySocialSet{UserID: userID}) + } + return selected, rows.Err() +} diff --git a/sql/01_schema.sql b/sql/01_schema.sql index ebfef701..5135d6a0 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -14688,9 +14688,92 @@ ALTER TABLE ONLY public.user_payout_wallet_history ALTER TABLE ONLY public.users ADD CONSTRAINT users_blocknumber_fkey FOREIGN KEY (blocknumber) REFERENCES public.blocks(number) ON DELETE CASCADE; +CREATE TABLE public.user_social_sets ( + user_id integer NOT NULL, + followees_bitmap bytea DEFAULT '\x'::bytea NOT NULL, + followers_bitmap bytea DEFAULT '\x'::bytea NOT NULL, + updated_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL +); + +ALTER TABLE ONLY public.user_social_sets + ADD CONSTRAINT user_social_sets_pkey PRIMARY KEY (user_id); + +CREATE TABLE public.user_social_set_dirty ( + user_id integer NOT NULL, + followees_dirty boolean DEFAULT false NOT NULL, + followers_dirty boolean DEFAULT false NOT NULL, + updated_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL +); + +ALTER TABLE ONLY public.user_social_set_dirty + ADD CONSTRAINT user_social_set_dirty_pkey PRIMARY KEY (user_id); + +CREATE INDEX user_social_set_dirty_updated_at_idx ON public.user_social_set_dirty USING btree (updated_at, user_id); + +CREATE FUNCTION public.mark_user_social_set_dirty() RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + IF TG_OP = 'DELETE' THEN + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.follower_user_id, true, false, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.followee_user_id, false, true, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + RETURN OLD; + END IF; + + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (NEW.follower_user_id, true, false, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (NEW.followee_user_id, false, true, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + + IF TG_OP = 'UPDATE' THEN + IF OLD.follower_user_id IS DISTINCT FROM NEW.follower_user_id THEN + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.follower_user_id, true, false, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + END IF; + + IF OLD.followee_user_id IS DISTINCT FROM NEW.followee_user_id THEN + INSERT INTO user_social_set_dirty (user_id, followees_dirty, followers_dirty, updated_at) + VALUES (OLD.followee_user_id, false, true, CURRENT_TIMESTAMP) + ON CONFLICT (user_id) DO UPDATE SET + followees_dirty = user_social_set_dirty.followees_dirty OR EXCLUDED.followees_dirty, + followers_dirty = user_social_set_dirty.followers_dirty OR EXCLUDED.followers_dirty, + updated_at = CURRENT_TIMESTAMP; + END IF; + END IF; + + RETURN NEW; +END; +$$; + +CREATE TRIGGER mark_user_social_set_dirty AFTER INSERT OR DELETE OR UPDATE ON public.follows FOR EACH ROW EXECUTE FUNCTION public.mark_user_social_set_dirty(); + -- -- PostgreSQL database dump complete -- -