Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7f136af
feat: discover and parse project-maintained affiliations
skwowet Jun 30, 2026
001b82c
fix: resolve pr review comments
skwowet Jun 30, 2026
f0efc5a
Merge branch 'main' into feat/CM-361-part-1
skwowet Jun 30, 2026
9cbf4b9
Merge branch 'main' into feat/CM-361-part-1
skwowet Jun 30, 2026
31dc447
fix: resolve pr review comments
skwowet Jun 30, 2026
e9cdd2a
feat: enhance affiliation service with new status and refactor logging
skwowet Jun 30, 2026
3e1c41c
Merge branch 'main' into feat/CM-361-part-1
skwowet Jun 30, 2026
17a99a3
fix: resolve pr review comments
skwowet Jun 30, 2026
48134bd
refactor: update glob pattern handling in AffiliationService to inclu…
skwowet Jun 30, 2026
dcb5657
fix: rm redundant check
skwowet Jun 30, 2026
10adda6
refactor: simplify repo affiliation registry retrival
skwowet Jul 1, 2026
be80ade
Merge branch 'main' into feat/CM-361-part-1
skwowet Jul 1, 2026
202dc1d
fix: resolve pr review comments
skwowet Jul 1, 2026
a101f37
fix: affiliation registry writes and expected-run reporting
skwowet Jul 1, 2026
2da0d4f
refactor: batch affiliation filename search like maintainers
skwowet Jul 1, 2026
3d1cd30
fix: resolve pr review comments
skwowet Jul 1, 2026
ce8d853
fix: make prettier and linter happy
skwowet Jul 1, 2026
8854ce3
refactor: retry malformed affiliation parses once before unusable
skwowet Jul 1, 2026
f1addda
Merge branch 'main' into feat/CM-361-part-1
skwowet Jul 2, 2026
17b6793
feat: support affiliation stints and improve extraction coverage
skwowet Jul 2, 2026
3bb2b9e
fix: prefer email over github and resolve git emails via username ide…
skwowet Jul 2, 2026
c1cea25
fix: change date fields in AffiliationOrganizationFields to string ty…
skwowet Jul 2, 2026
1790067
Merge branch 'main' into feat/CM-361-part-1
skwowet Jul 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 283 additions & 0 deletions services/apps/git_integration/src/crowdgit/database/crud.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from datetime import datetime, timezone

from loguru import logger
from pydantic import TypeAdapter
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed

from crowdgit.enums import RepositoryPriority, RepositoryState
from crowdgit.errors import RepoLockingError
from crowdgit.models.affiliation_info import AffiliationInfoItem
from crowdgit.models.repository import Repository
from crowdgit.models.service_execution import ServiceExecution
from crowdgit.settings import (
Expand Down Expand Up @@ -524,3 +526,284 @@ async def save_service_execution(service_execution: ServiceExecution) -> None:
f"error: {e}"
)
# Do not re-raise - we don't want metrics saving to disrupt main operations


_AFFILIATION_SNAPSHOT_ADAPTER = TypeAdapter(list[AffiliationInfoItem])


def parse_affiliation_snapshot(snapshot) -> list[AffiliationInfoItem]:
if isinstance(snapshot, dict) and "affiliations" in snapshot:
snapshot = snapshot["affiliations"]
return _AFFILIATION_SNAPSHOT_ADAPTER.validate_python(snapshot)
Comment thread
skwowet marked this conversation as resolved.
Outdated


def dump_affiliation_snapshot(affiliations: list[AffiliationInfoItem]) -> list[dict]:
return [item.model_dump() for item in affiliations]


async def get_repo_affiliation_registry(repo_id: str):
sql_query = """
SELECT "filePath", "fileSha", "status", "snapshot", "lastRunAt"
FROM git."repoAffiliationRegistry"
WHERE "repoId" = $1
"""
result = await fetchrow(sql_query, (repo_id,))
if not result:
return None

row = dict(result)
snapshot = row.get("snapshot")
if snapshot is not None:
snapshot = parse_affiliation_snapshot(snapshot)

return {
"file_path": row.get("filePath"),
"file_sha": row.get("fileSha"),
"status": row.get("status"),
"snapshot": snapshot,
"last_run_at": row.get("lastRunAt"),
}


async def upsert_repo_affiliation_registry(
repo_id: str,
*,
file_path: str | None,
file_sha: str | None,
status: str,
snapshot: list[AffiliationInfoItem] | None,
) -> None:
snapshot_json = dump_affiliation_snapshot(snapshot) if snapshot is not None else None
sql_query = """
INSERT INTO git."repoAffiliationRegistry" (
"repoId", "filePath", "fileSha", "status", "snapshot", "lastRunAt", "updatedAt"
)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
ON CONFLICT ("repoId") DO UPDATE SET
"filePath" = EXCLUDED."filePath",
"fileSha" = EXCLUDED."fileSha",
"status" = EXCLUDED."status",
"snapshot" = EXCLUDED."snapshot",
"lastRunAt" = NOW(),
"updatedAt" = NOW()
"""
await execute(
sql_query,
(repo_id, file_path, file_sha, status, snapshot_json),
)


async def find_many_member_ids_by_identities(identities: list[dict]) -> list[dict]:
if not identities:
return []

values_parts: list[str] = []
params: list[str | bool | int] = []
param_index = 1
for idx, identity in enumerate(identities):
values_parts.append(
f"(${param_index}, ${param_index + 1}, ${param_index + 2}, ${param_index + 3}, ${param_index + 4})"
)
params.extend(
[
idx,
identity["type"],
identity.get("verified", True),
identity.get("platform"),
identity["value"],
]
)
param_index += 5
Comment on lines +605 to +619

matches_by_idx: dict[int, set[str]] = {}
rows = await query(
f"""
WITH input_identities (idx, identity_type, verified, platform, value) AS (
VALUES {", ".join(values_parts)}
)
SELECT i.idx, mi."memberId"
FROM input_identities i
LEFT JOIN "memberIdentities" mi
ON mi.type = i.identity_type
AND mi.verified = i.verified
AND lower(mi.value) = lower(i.value)
AND (i.platform IS NULL OR mi.platform = i.platform)
AND mi."deletedAt" IS NULL
ORDER BY i.idx
""",
tuple(params),
)
for row in rows:
if row["memberId"] is None:
continue
matches_by_idx.setdefault(row["idx"], set()).add(str(row["memberId"]))

results: list[dict] = []
for idx, identity in enumerate(identities):
member_ids = matches_by_idx.get(idx, set())
member_id = next(iter(member_ids)) if len(member_ids) == 1 else None
results.append(
{
"type": identity["type"],
"platform": identity.get("platform"),
"value": identity["value"],
"verified": identity.get("verified", True),
"member_id": member_id,
}
)

return results


async def find_many_organization_ids_by_identities(identities: list[dict]) -> list[dict]:
if not identities:
return []

values_parts: list[str] = []
params: list[str | bool | int] = []
param_index = 1
for idx, identity in enumerate(identities):
values_parts.append(f"(${param_index}, ${param_index + 1}, ${param_index + 2}, ${param_index + 3})")
params.extend(
[
idx,
identity["type"],
identity.get("verified", True),
identity["value"],
]
)
param_index += 4

matches_by_idx: dict[int, set[str]] = {}
rows = await query(
f"""
WITH input_identities (idx, identity_type, verified, value) AS (
VALUES {", ".join(values_parts)}
)
SELECT i.idx, oi."organizationId"
FROM input_identities i
LEFT JOIN "organizationIdentities" oi
ON oi.type = i.identity_type
AND oi.verified = i.verified
AND lower(oi.value) = lower(i.value)
ORDER BY i.idx
""",
tuple(params),
)
for row in rows:
if row["organizationId"] is None:
continue
matches_by_idx.setdefault(row["idx"], set()).add(str(row["organizationId"]))

results: list[dict] = []
for idx, identity in enumerate(identities):
organization_ids = matches_by_idx.get(idx, set())
organization_id = next(iter(organization_ids)) if len(organization_ids) == 1 else None
results.append(
{
"type": identity["type"],
"value": identity["value"],
"verified": identity.get("verified", True),
"organization_id": organization_id,
}
)

return results


async def fetch_member_organizations(member_ids: list[str]) -> list[dict]:
if not member_ids:
return []

return await query(
"""
SELECT "memberId", "organizationId", "dateStart", "dateEnd", source
FROM "memberOrganizations"
WHERE "memberId" = ANY($1::uuid[])
AND "deletedAt" IS NULL
""",
(member_ids,),
)


async def fetch_segment_affiliations(member_ids: list[str], segment_id: str) -> list[dict]:
"""MSA rows are per segment — filter by segment_id so guards match this repo's project."""
if not member_ids:
return []

return await query(
"""
SELECT "memberId", "segmentId", "organizationId", "dateStart", "dateEnd", verified
FROM "memberSegmentAffiliations"
WHERE "memberId" = ANY($1::uuid[])
AND "segmentId" = $2::uuid
AND "deletedAt" IS NULL
AND "organizationId" IS NOT NULL
""",
(member_ids, segment_id),
)


async def insert_member_organizations(rows: list[dict]) -> int:
if not rows:
return 0

sql_query = """
INSERT INTO "memberOrganizations"(
"memberId",
"organizationId",
"dateStart",
"dateEnd",
"title",
source,
verified,
"createdAt",
"updatedAt"
)
VALUES ($1, $2, NULL, NULL, NULL, $3, false, NOW(), NOW())
ON CONFLICT ("memberId", "organizationId", "dateStart", "dateEnd") DO NOTHING
Comment thread
skwowet marked this conversation as resolved.
Outdated
"""
await executemany(
sql_query,
[
(
row["member_id"],
row["organization_id"],
row.get("source", "project-registry"),
)
for row in rows
],
)
return len(rows)
Comment thread
skwowet marked this conversation as resolved.
Outdated


async def insert_member_segment_affiliations(rows: list[dict]) -> int:
if not rows:
return 0

sql_query = """
INSERT INTO "memberSegmentAffiliations"(
id,
"memberId",
"segmentId",
"organizationId",
"dateStart",
"dateEnd",
verified
)
VALUES (gen_random_uuid(), $1, $2, $3, NULL, NULL, $4)
"""
await executemany(
sql_query,
[
(
row["member_id"],
row["segment_id"],
row["organization_id"],
row.get("verified", False),
)
for row in rows
],
)
return len(rows)
Comment thread
skwowet marked this conversation as resolved.
Outdated

10 changes: 10 additions & 0 deletions services/apps/git_integration/src/crowdgit/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class ErrorCode(str, Enum):
NO_MAINTAINER_FOUND = "no-maintainer-found"
MAINTAINER_ANALYSIS_FAILED = "maintainer-analysis-failed"
MAINTAINER_INTERVAL_NOT_ELAPSED = "maintainer-interval-not-elapsed"
NO_AFFILIATION_FILE = "no-affiliation-file"
AFFILIATION_ANALYSIS_FAILED = "affiliation-analysis-failed"
AFFILIATION_INTERVAL_NOT_ELAPSED = "affiliation-interval-not-elapsed"
CLEANUP_FAILED = "cleanup-failed"
PARENT_REPO_INVALID = "parent-repo-invalid"
REONBOARDING_REQUIRED = "reonboarding-required"
Expand Down Expand Up @@ -67,11 +70,18 @@ class ExecutionStatus(str, Enum):
FAILURE = "failure"


class AffiliationRegistryStatus(str, Enum):
SUCCESS = "success"
NOT_FOUND = "not_found"
ERROR = "error"


class OperationType(str, Enum):
"""Service operation types for metrics tracking"""

CLONE = "Clone"
COMMIT = "Commit"
MAINTAINER = "Maintainer"
REPO_AFFILIATION = "RepoAffiliation"
SOFTWARE_VALUE = "SoftwareValue"
VULNERABILITY_SCAN = "VulnerabilityScanner"
20 changes: 20 additions & 0 deletions services/apps/git_integration/src/crowdgit/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ class MaintainerIntervalNotElapsedError(CrowdGitError):
ai_cost: int = 0


@dataclass
class AffiliationFileNotFoundError(CrowdGitError):
error_message: str = "No affiliation file found in this repository"
error_code: ErrorCode = ErrorCode.NO_AFFILIATION_FILE
ai_cost: int = 0

Comment thread
skwowet marked this conversation as resolved.

@dataclass
class AffiliationAnalysisError(CrowdGitError):
error_message: str = "Could not parse the affiliation file"
error_code: ErrorCode = ErrorCode.AFFILIATION_ANALYSIS_FAILED


@dataclass
class AffiliationIntervalNotElapsedError(CrowdGitError):
error_message: str = "Too soon since the last affiliation run"
error_code: ErrorCode = ErrorCode.AFFILIATION_INTERVAL_NOT_ELAPSED
ai_cost: int = 0
Comment thread
Copilot marked this conversation as resolved.
Outdated


@dataclass
class ParentRepoInvalidError(CrowdGitError):
error_message: str = "Parent repository is not valid or not found"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pydantic import BaseModel


class AffiliationContributor(BaseModel):
email: str | None = None
name: str | None = None
github: str | None = None


class AffiliationOrganization(BaseModel):
name: str | None = None
domain: str | None = None


class AffiliationInfoItem(BaseModel):
contributor: AffiliationContributor
organization: AffiliationOrganization


class AffiliationFile(BaseModel):
file_name: str | None = None
error: str | None = None


class AffiliationParseOutput(BaseModel):
affiliations: list[AffiliationInfoItem] | None = None
error: str | None = None
Loading
Loading