Sync in GitHub events for authed GitHub users

This commit is contained in:
Zach Latta
2025-05-14 14:33:55 -04:00
parent 4557462658
commit a234faf608
9 changed files with 330 additions and 1 deletions

View File

@@ -122,6 +122,7 @@ GEM
avo-record_link_field (0.0.2)
base64 (0.2.0)
bcrypt_pbkdf (1.1.1)
bcrypt_pbkdf (1.1.1-arm64-darwin)
benchmark (0.4.0)
bigdecimal (3.1.9)
bindex (0.8.1)
@@ -531,6 +532,7 @@ PLATFORMS
aarch64-linux-musl
arm-linux-gnu
arm-linux-musl
arm64-darwin-23
arm64-darwin-24
x86_64-linux
x86_64-linux-gnu

View File

@@ -0,0 +1,14 @@
class Avo::Resources::RepoHostEvent < Avo::BaseResource
# self.includes = []
# self.attachments = []
# self.search = {
# query: -> { query.ransack(id_eq: params[:q], m: "or").result(distinct: false) }
# }
def fields
field :id, as: :id
field :user, as: :belongs_to
field :raw_event_payload, as: :code
field :provider, as: :number
end
end

View File

@@ -0,0 +1,4 @@
# This controller has been generated to enable Rails' resource routes.
# More information on https://docs.avohq.io/3.0/controllers.html
class Avo::RepoHostEventsController < Avo::ResourcesController
end

View File

@@ -0,0 +1,165 @@
require 'http' # Make sure 'http' gem is available
module RepoHost
class SyncUserEventsJob < ApplicationJob
queue_as :repo_event_syncing # A dedicated queue for these potentially long-running jobs
# MAX_API_PAGES_TO_FETCH: Max pages to fetch. GitHub's /users/{username}/events endpoint
# is limited to 300 events. If per_page=100 (as we request), this is 3 pages.
# If GitHub defaults to per_page=30, this would be 10 pages.
# This constant acts as a safeguard.
MAX_API_PAGES_TO_FETCH = 10
EVENTS_PER_PAGE = 100
discard_on ActiveJob::DeserializationError # Standard GoodJob practice
# Retry with exponential backoff for transient network issues or temporary API errors
retry_on StandardError, wait: -> (executions) { [executions * 5, 60].min.seconds }, attempts: 3
def perform(user_id:, provider:)
@user = User.find_by(id: user_id)
@provider_sym = provider.to_sym
unless @user
Rails.logger.warn "RepoHost::SyncUserEventsJob: User ##{user_id} not found. Skipping."
return
end
# Provider-specific setup
case @provider_sym
when :github
unless @user.github_access_token.present? && @user.github_username.present?
Rails.logger.warn "RepoHost::SyncUserEventsJob: User ##{@user.id} missing GitHub token or username. Skipping."
return
end
Rails.logger.info "Starting GitHub event sync for User ##{@user.id} (#{@user.github_username})"
process_github_events
# Add :gitlab case here in the future
# when :gitlab
# process_gitlab_events
else
Rails.logger.error "RepoHost::SyncUserEventsJob: Unknown provider '#{@provider_sym}' for User ##{@user.id}. Skipping."
return
end
Rails.logger.info "Finished event sync for User ##{@user.id}, Provider: #{@provider_sym}."
end
private
def process_github_events
base_api_url = "https://api.github.com/users/#{@user.github_username}/events?per_page=#{EVENTS_PER_PAGE}"
current_page = 1
pages_processed_count = 0 # Renamed from page_count to avoid confusion with current_page
newly_created_event_count_total = 0
latest_stored_event_db_created_at = RepoHostEvent
.where(user: @user, provider: @provider_sym)
.maximum(:created_at)
loop do
pages_processed_count += 1
if pages_processed_count > MAX_API_PAGES_TO_FETCH
Rails.logger.warn "RepoHost::SyncUserEventsJob: Reached max pages (#{MAX_API_PAGES_TO_FETCH}) for User ##{@user.id}. Stopping."
break
end
api_url = "#{base_api_url}&page=#{current_page}"
Rails.logger.debug "Fetching GitHub events for User ##{@user.id}, Page #{current_page}, URL: #{api_url}"
begin
response = http_client_for_github.get(api_url)
rescue HTTP::Error => e
Rails.logger.error "RepoHost::SyncUserEventsJob: HTTP Error for User ##{@user.id} on page #{current_page}: #{e.message}"
break
end
unless response.status.success?
handle_github_api_error(response, current_page)
break
end
fetched_events_json = response.parse
Rails.logger.info "RepoHost::SyncUserEventsJob: User ##{@user.id}, Page #{current_page}: API returned #{fetched_events_json.size} events."
break if fetched_events_json.empty?
events_to_create_on_this_page = []
stop_fetching_for_this_user = false
fetched_events_json.each do |gh_event_data|
original_event_id_str = gh_event_data['id'].to_s
repo_host_event_id = RepoHostEvent.construct_event_id(@provider_sym, original_event_id_str)
event_occurred_at = Time.zone.parse(gh_event_data['created_at'])
if latest_stored_event_db_created_at && event_occurred_at <= latest_stored_event_db_created_at
if RepoHostEvent.exists?(id: repo_host_event_id, user_id: @user.id)
Rails.logger.info "RepoHost::SyncUserEventsJob: Event ID #{repo_host_event_id} (occurred at #{event_occurred_at}) already exists for User ##{@user.id}. Stopping pagination."
stop_fetching_for_this_user = true
break
end
end
events_to_create_on_this_page << {
id: repo_host_event_id,
user_id: @user.id,
raw_event_payload: gh_event_data,
provider: RepoHostEvent.providers[@provider_sym],
created_at: event_occurred_at,
updated_at: Time.current
}
end
if events_to_create_on_this_page.any?
result = RepoHostEvent.import(
events_to_create_on_this_page,
on_duplicate_key_ignore: { conflict_target: [:id] },
validate: false
)
newly_created_event_count_total += result.num_inserts
Rails.logger.info "RepoHost::SyncUserEventsJob: For User ##{@user.id}, page #{current_page}: Processed #{events_to_create_on_this_page.size} events, imported #{result.num_inserts} new events."
else
Rails.logger.info "RepoHost::SyncUserEventsJob: For User ##{@user.id}, page #{current_page}: No new events to import."
end
break if stop_fetching_for_this_user
# Manual pagination: increment page number for next request
current_page += 1
sleep 1 # Be a good API citizen; basic rate limit avoidance
end # end of loop for pagination
Rails.logger.info "RepoHost::SyncUserEventsJob: User ##{@user.id} GitHub sync: Imported a total of #{newly_created_event_count_total} new events across #{pages_processed_count} API pages."
end
def http_client_for_github
HTTP.headers(
"Accept" => "application/vnd.github+json",
"Authorization" => "Bearer #{@user.github_access_token}",
"X-GitHub-Api-Version" => "2022-11-28"
).timeout(connect: 5, read: 10) # Add timeouts
end
def handle_github_api_error(response, page_number)
error_details = response.parse rescue response.body.to_s.truncate(255)
log_message = "RepoHost::SyncUserEventsJob: GitHub API Error for User ##{@user.id} on page #{page_number}: Status #{response.status}, Body: #{error_details}"
Rails.logger.error log_message
case response.status.code
when 401 # Unauthorized
Rails.logger.warn "GitHub token for User ##{@user.id} is likely invalid or expired. Sync aborted."
when 403 # Forbidden
if response.headers['X-RateLimit-Remaining'].to_i == 0
reset_time = Time.at(response.headers['X-RateLimit-Reset'].to_i)
Rails.logger.warn "GitHub API rate limit exceeded for User ##{@user.id}. Resets at #{reset_time}. Sync aborted."
else
Rails.logger.warn "GitHub API permission issue for User ##{@user.id} (e.g. fine-grained token scopes). Sync aborted."
end
when 404 # Not Found
Rails.logger.warn "GitHub user '#{@user.github_username}' (User ##{@user.id}) not found via API. Sync aborted."
when 422 # Unprocessable Entity - often if the user has been suspended
Rails.logger.warn "GitHub API returned 422 for User ##{@user.id}. User might be suspended. Sync aborted. Details: #{error_details}"
else
Rails.logger.error "Unhandled GitHub API error for User ##{@user.id}: #{response.status}. Sync aborted."
end
end
end
end

View File

@@ -0,0 +1,31 @@
class SyncAllUserRepoEventsJob < ApplicationJob
queue_as :default # Or a more specific queue like :batch_enqueueing
def perform
Rails.logger.info "Kicking off SyncAllUserRepoEventsJob"
# Identify users:
# 1. Authenticated with GitHub (have an access token and username)
# 2. Have had heartbeats in the last 6 hours
users_to_sync = User.where.not(github_access_token: nil)
.where.not(github_username: nil)
.joins(:heartbeats) # Assumes User has_many :heartbeats
.where("heartbeats.created_at >= ?", 6.hours.ago)
.distinct
if users_to_sync.empty?
Rails.logger.info "No users eligible for GitHub event sync at this time."
return
end
Rails.logger.info "Found #{users_to_sync.count} users eligible for GitHub event sync."
GoodJob::Batch.enqueue(description: "Sync GitHub events for #{users_to_sync.count} active users at #{Time.current.iso8601}") do
users_to_sync.each do |user|
# Enqueue a job for each user, specifying 'github' as the provider
RepoHost::SyncUserEventsJob.perform_later(user_id: user.id, provider: :github)
end
end
Rails.logger.info "Successfully enqueued batch for GitHub event sync."
end
end

View File

@@ -0,0 +1,36 @@
class RepoHostEvent < ApplicationRecord
belongs_to :user
# Tell ActiveRecord to use 'id' as the primary key, even though it's a string.
self.primary_key = :id
enum :provider, { github: 0, gitlab: 1 }
# Validations
validates :id, presence: true, uniqueness: true
validates :raw_event_payload, presence: true
validates :provider, presence: true
validates :created_at, presence: true # This is the event's occurrence time from the provider
# Ensure ID starts with a recognized provider prefix
validates :id, format: {
with: /\A(gh|gl)_.+\z/, # Allow gh_ or gl_ prefixes
message: "must start with a provider prefix (e.g., gh_ or gl_)"
}
# Helper scope
scope :for_user_and_provider, ->(user, provider_name) {
where(user: user, provider: providers[provider_name.to_sym])
}
# Helper to construct the prefixed ID
def self.construct_event_id(provider_name, original_event_id)
prefix = case provider_name.to_sym
when :github then "gh_"
when :gitlab then "gl_" # Example for future
else
raise ArgumentError, "Unknown provider: #{provider_name}"
end
"#{prefix}#{original_event_id}"
end
end

View File

@@ -53,6 +53,11 @@ Rails.application.configure do
cron: "0 10 * * *",
class: "ScanGithubReposJob"
},
sync_all_user_repo_events: {
cron: "0 */6 * * *", # Every 6 hours (at minute 0 of 0, 6, 12, 18 hours)
class: "SyncAllUserRepoEventsJob",
description: "Periodically syncs repository events for all eligible users."
},
cleanup_expired_email_verification_requests: {
cron: "* * * * *",
class: "CleanupExpiredEmailVerificationRequestsJob"

View File

@@ -0,0 +1,24 @@
class CreateRepoHostEvents < ActiveRecord::Migration[8.0]
def change
# id: false because we are defining a custom string primary key 'id'
create_table :repo_host_events, id: false do |t|
t.string :id, null: false, primary_key: true # Custom PK: e.g., gh_eventid123
t.references :user, null: false, foreign_key: true
t.jsonb :raw_event_payload, null: false # Stores the actual event content from GitHub
t.integer :provider, null: false, default: 0 # 0 for GitHub
# Per prompt: "created_at is created_at from gh json"
# This means the AR `created_at` field will store the event's timestamp from GitHub.
# Rails' `updated_at` will track when our DB record was last modified.
t.datetime :created_at, null: false
t.datetime :updated_at, null: false
end
# Add an index on provider for filtering
add_index :repo_host_events, :provider
# Add an index for efficiently finding the latest event for a user/provider,
# and for the "stop fetching if event exists" logic.
# The primary key `id` is already unique and indexed.
add_index :repo_host_events, [:user_id, :provider, :created_at], name: 'index_repo_host_events_on_user_provider_created_at'
end
end

50
db/schema.rb generated
View File

@@ -10,9 +10,12 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema[8.0].define(version: 2025_05_09_191155) do
ActiveRecord::Schema[8.0].define(version: 2025_05_14_180503) do
create_schema "pganalyze"
# These are extensions that must be enabled in order to support this database
enable_extension "pg_catalog.plpgsql"
enable_extension "pg_stat_statements"
create_table "ahoy_events", force: :cascade do |t|
t.bigint "visit_id"
@@ -209,8 +212,10 @@ ActiveRecord::Schema[8.0].define(version: 2025_05_09_191155) do
t.integer "ysws_program", default: 0, null: false
t.datetime "deleted_at"
t.jsonb "raw_data"
t.bigint "raw_heartbeat_upload_id"
t.index ["category", "time"], name: "index_heartbeats_on_category_and_time"
t.index ["fields_hash"], name: "index_heartbeats_on_fields_hash_when_not_deleted", unique: true, where: "(deleted_at IS NULL)"
t.index ["raw_heartbeat_upload_id"], name: "index_heartbeats_on_raw_heartbeat_upload_id"
t.index ["user_id", "time"], name: "idx_heartbeats_user_time_active", where: "(deleted_at IS NULL)"
t.index ["user_id"], name: "index_heartbeats_on_user_id"
end
@@ -251,6 +256,16 @@ ActiveRecord::Schema[8.0].define(version: 2025_05_09_191155) do
t.index ["user_id"], name: "index_mailing_addresses_on_user_id"
end
create_table "physical_mails", force: :cascade do |t|
t.bigint "user_id", null: false
t.integer "mission_type", null: false
t.integer "status", default: 0, null: false
t.string "theseus_id"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["user_id"], name: "index_physical_mails_on_user_id"
end
create_table "project_repo_mappings", force: :cascade do |t|
t.bigint "user_id", null: false
t.string "project_name", null: false
@@ -261,6 +276,24 @@ ActiveRecord::Schema[8.0].define(version: 2025_05_09_191155) do
t.index ["user_id"], name: "index_project_repo_mappings_on_user_id"
end
create_table "raw_heartbeat_uploads", force: :cascade do |t|
t.jsonb "request_headers", null: false
t.jsonb "request_body", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
end
create_table "repo_host_events", id: :string, force: :cascade do |t|
t.bigint "user_id", null: false
t.jsonb "raw_event_payload", null: false
t.integer "provider", default: 0, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["provider"], name: "index_repo_host_events_on_provider"
t.index ["user_id", "provider", "created_at"], name: "index_repo_host_events_on_user_provider_created_at"
t.index ["user_id"], name: "index_repo_host_events_on_user_id"
end
create_table "sailors_log_leaderboards", force: :cascade do |t|
t.string "slack_channel_id"
t.string "slack_uid"
@@ -344,13 +377,28 @@ ActiveRecord::Schema[8.0].define(version: 2025_05_09_191155) do
t.index ["item_type", "item_id"], name: "index_versions_on_item_type_and_item_id"
end
create_table "wakatime_mirrors", force: :cascade do |t|
t.bigint "user_id", null: false
t.string "endpoint_url", default: "https://wakatime.com/api/v1", null: false
t.string "encrypted_api_key", null: false
t.datetime "last_synced_at"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["user_id", "endpoint_url"], name: "index_wakatime_mirrors_on_user_id_and_endpoint_url", unique: true
t.index ["user_id"], name: "index_wakatime_mirrors_on_user_id"
end
add_foreign_key "api_keys", "users"
add_foreign_key "email_addresses", "users"
add_foreign_key "email_verification_requests", "users"
add_foreign_key "heartbeats", "raw_heartbeat_uploads"
add_foreign_key "heartbeats", "users"
add_foreign_key "leaderboard_entries", "leaderboards"
add_foreign_key "leaderboard_entries", "users"
add_foreign_key "mailing_addresses", "users"
add_foreign_key "physical_mails", "users"
add_foreign_key "project_repo_mappings", "users"
add_foreign_key "repo_host_events", "users"
add_foreign_key "sign_in_tokens", "users"
add_foreign_key "wakatime_mirrors", "users"
end