Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/acs-tax-unit-linking.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up ACS spouse and parent inference in dataset builds.
257 changes: 149 additions & 108 deletions policyengine_us_data/datasets/acs/acs_to_cps_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,81 +324,109 @@ def _infer_spouse_lines(
line_no: pd.Series,
age: pd.Series,
) -> tuple[pd.Series, pd.Series]:
spouse_line = pd.Series(0, index=person.index, dtype=int)
imputed = pd.Series(False, index=person.index, dtype=bool)
mar = _numeric(person, "MAR").astype(int)

frame = pd.DataFrame(
{
"household_id": household_id,
"line_no": line_no,
"age": age,
"rel": rel,
"mar": mar,
"sex": _numeric(person, "SEX").astype(int),
},
index=person.index,
)
n = len(person)
spouse_line = np.zeros(n, dtype=np.int64)
imputed = np.zeros(n, dtype=bool)
positions = pd.Series(np.arange(n), index=person.index)
rel_values = rel.to_numpy(dtype=np.int64, copy=False)
household_values = household_id.to_numpy(dtype=np.int64, copy=False)
line_values = line_no.to_numpy(dtype=np.int64, copy=False)
age_values = age.to_numpy(dtype=np.int64, copy=False)
mar_values = mar.to_numpy(dtype=np.int64, copy=False)
reference_codes = np.fromiter(_reference_codes(relationship_system), dtype=np.int64)
spouse_codes = np.fromiter(_spouse_codes(relationship_system), dtype=np.int64)

for _, household_positions in positions.groupby(household_values, sort=False):
household_index = household_positions.to_numpy(dtype=np.int64, copy=False)
household_rel = rel_values[household_index]
reference_positions = household_index[np.isin(household_rel, reference_codes)]
if len(reference_positions):
reference_pos = int(reference_positions[0])
else:
reference_pos = int(
household_index[np.argmin(line_values[household_index])]
)
reference_line = int(line_values[reference_pos])

for _, household in frame.groupby("household_id", sort=False):
reference = household[
household["rel"].isin(_reference_codes(relationship_system))
direct_spouse_positions = household_index[
np.isin(household_rel, spouse_codes) & (mar_values[household_index] == 1)
]
if reference.empty:
reference = household[household["line_no"] == household["line_no"].min()]
reference_index = reference.index[0]
reference_line = int(frame.loc[reference_index, "line_no"])

direct_spouses = household[
household["rel"].isin(_spouse_codes(relationship_system))
& (household["mar"] == 1)
if len(direct_spouse_positions) and mar_values[reference_pos] == 1:
spouse_pos = int(
direct_spouse_positions[np.argmin(line_values[direct_spouse_positions])]
)
spouse_line[reference_pos] = int(line_values[spouse_pos])
spouse_line[spouse_pos] = reference_line

unlinked = household_index[
(mar_values[household_index] == 1)
& (age_values[household_index] >= 18)
& (spouse_line[household_index] <= 0)
]
if not direct_spouses.empty and frame.loc[reference_index, "mar"] == 1:
spouse_index = direct_spouses.sort_values("line_no").index[0]
spouse_line.loc[reference_index] = int(frame.loc[spouse_index, "line_no"])
spouse_line.loc[spouse_index] = reference_line

unlinked = household[
(household["mar"] == 1)
& (household["age"] >= 18)
& (spouse_line.loc[household.index] <= 0)
].copy()
remaining = set(unlinked.index)
for index in sorted(remaining, key=lambda item: frame.loc[item, "line_no"]):
if index not in remaining:
remaining = set(int(position) for position in unlinked)
for position in sorted(remaining, key=lambda item: line_values[item]):
if position not in remaining:
continue
candidate_indexes = [
candidate for candidate in remaining if candidate != index
candidate for candidate in remaining if candidate != position
]
scored_candidates = []
for candidate in candidate_indexes:
score = _spouse_pair_score(
frame.loc[index],
frame.loc[candidate],
relationship_system,
score = _spouse_pair_score_values(
rel_a=rel_values[position],
rel_b=rel_values[candidate],
age_a=age_values[position],
age_b=age_values[candidate],
line_a=line_values[position],
line_b=line_values[candidate],
relationship_system=relationship_system,
)
if score is not None:
scored_candidates.append((score, candidate))
if not scored_candidates:
continue
_, spouse_index = max(scored_candidates)
spouse_line.loc[index] = int(frame.loc[spouse_index, "line_no"])
spouse_line.loc[spouse_index] = int(frame.loc[index, "line_no"])
imputed.loc[[index, spouse_index]] = True
remaining.discard(index)
remaining.discard(spouse_index)

return spouse_line, imputed
_, spouse_pos = max(scored_candidates)
spouse_line[position] = int(line_values[spouse_pos])
spouse_line[spouse_pos] = int(line_values[position])
imputed[[position, spouse_pos]] = True
remaining.discard(position)
remaining.discard(spouse_pos)

return pd.Series(spouse_line, index=person.index), pd.Series(
imputed, index=person.index
)


def _spouse_pair_score(
person_a: pd.Series,
person_b: pd.Series,
relationship_system: str,
) -> tuple[int, int, int] | None:
rel_a = int(person_a["rel"])
rel_b = int(person_b["rel"])
age_gap = abs(int(person_a["age"]) - int(person_b["age"]))
return _spouse_pair_score_values(
rel_a=int(person_a["rel"]),
rel_b=int(person_b["rel"]),
age_a=int(person_a["age"]),
age_b=int(person_b["age"]),
line_a=int(person_a["line_no"]),
line_b=int(person_b["line_no"]),
relationship_system=relationship_system,
)


def _spouse_pair_score_values(
rel_a: int,
rel_b: int,
age_a: int,
age_b: int,
line_a: int,
line_b: int,
relationship_system: str,
) -> tuple[int, int, int] | None:
rel_a = int(rel_a)
rel_b = int(rel_b)
age_gap = abs(int(age_a) - int(age_b))
if age_gap > 20:
return None

Expand All @@ -408,9 +436,9 @@ def _spouse_pair_score(
parent_in_law_codes = _parent_in_law_codes(relationship_system)
pair = {rel_a, rel_b}
if pair & child_codes and pair & child_in_law_codes:
return (100, -age_gap, -min(int(person_a["line_no"]), int(person_b["line_no"])))
return (100, -age_gap, -min(int(line_a), int(line_b)))
if pair & parent_codes and pair & parent_in_law_codes:
return (90, -age_gap, -min(int(person_a["line_no"]), int(person_b["line_no"])))
return (90, -age_gap, -min(int(line_a), int(line_b)))
return None


Expand All @@ -423,63 +451,76 @@ def _infer_parent_lines(
age: pd.Series,
spouse_line: pd.Series,
) -> tuple[pd.Series, pd.Series, pd.Series]:
parent1 = pd.Series(0, index=person.index, dtype=int)
parent2 = pd.Series(0, index=person.index, dtype=int)
imputed = pd.Series(False, index=person.index, dtype=bool)
frame = pd.DataFrame(
{
"household_id": household_id,
"line_no": line_no,
"age": age,
"rel": rel,
"spouse_line": spouse_line,
},
index=person.index,
n = len(person)
parent1 = np.zeros(n, dtype=np.int64)
parent2 = np.zeros(n, dtype=np.int64)
imputed = np.zeros(n, dtype=bool)
positions = pd.Series(np.arange(n), index=person.index)
rel_values = rel.to_numpy(dtype=np.int64, copy=False)
household_values = household_id.to_numpy(dtype=np.int64, copy=False)
line_values = line_no.to_numpy(dtype=np.int64, copy=False)
age_values = age.to_numpy(dtype=np.int64, copy=False)
spouse_values = spouse_line.to_numpy(dtype=np.int64, copy=False)
reference_codes = np.fromiter(_reference_codes(relationship_system), dtype=np.int64)
own_child_codes = np.fromiter(
_child_codes(relationship_system) | _foster_child_codes(relationship_system),
dtype=np.int64,
)
grandchild_codes = np.fromiter(
_grandchild_codes(relationship_system), dtype=np.int64
)
parent_candidate_codes = np.fromiter(
_child_codes(relationship_system) | _child_in_law_codes(relationship_system),
dtype=np.int64,
)

for _, household in frame.groupby("household_id", sort=False):
reference = household[
household["rel"].isin(_reference_codes(relationship_system))
]
if reference.empty:
reference = household[household["line_no"] == household["line_no"].min()]
reference_index = reference.index[0]
reference_line = int(frame.loc[reference_index, "line_no"])
reference_spouse_line = int(frame.loc[reference_index, "spouse_line"])

own_child_mask = household["rel"].isin(
_child_codes(relationship_system) | _foster_child_codes(relationship_system)
)
for index in household[own_child_mask].index:
parent1.loc[index] = reference_line
if reference_spouse_line > 0:
parent2.loc[index] = reference_spouse_line

grandchild_indexes = household[
household["rel"].isin(_grandchild_codes(relationship_system))
].index
parent_candidates = household[
household["rel"].isin(
_child_codes(relationship_system)
| _child_in_law_codes(relationship_system)
for _, household_positions in positions.groupby(household_values, sort=False):
household_index = household_positions.to_numpy(dtype=np.int64, copy=False)
household_rel = rel_values[household_index]
reference_positions = household_index[np.isin(household_rel, reference_codes)]
if len(reference_positions):
reference_pos = int(reference_positions[0])
else:
reference_pos = int(
household_index[np.argmin(line_values[household_index])]
)
reference_line = int(line_values[reference_pos])
reference_spouse_line = int(spouse_values[reference_pos])

own_child_positions = household_index[np.isin(household_rel, own_child_codes)]
for position in own_child_positions:
parent1[position] = reference_line
if reference_spouse_line > 0:
parent2[position] = reference_spouse_line

grandchild_positions = household_index[np.isin(household_rel, grandchild_codes)]
parent_candidate_positions = household_index[
np.isin(household_rel, parent_candidate_codes)
]
for index in grandchild_indexes:
possible = parent_candidates[
(parent_candidates["age"] - frame.loc[index, "age"]).between(15, 55)
].copy()
if possible.empty:
for position in grandchild_positions:
age_gap = age_values[parent_candidate_positions] - age_values[position]
possible_positions = parent_candidate_positions[
(age_gap >= 15) & (age_gap <= 55)
]
if len(possible_positions) == 0:
continue
possible["score"] = -(possible["age"] - frame.loc[index, "age"] - 30).abs()
selected_index = possible.sort_values(
["score", "age", "line_no"],
ascending=[False, False, True],
).index[0]
selected_line = int(frame.loc[selected_index, "line_no"])
parent1.loc[index] = selected_line
selected_spouse_line = int(frame.loc[selected_index, "spouse_line"])
selected_pos = max(
(int(candidate) for candidate in possible_positions),
key=lambda candidate: (
-abs(age_values[candidate] - age_values[position] - 30),
age_values[candidate],
-line_values[candidate],
),
)
selected_line = int(line_values[selected_pos])
parent1[position] = selected_line
selected_spouse_line = int(spouse_values[selected_pos])
if selected_spouse_line > 0:
parent2.loc[index] = selected_spouse_line
imputed.loc[index] = True
parent2[position] = selected_spouse_line
imputed[position] = True

return parent1, parent2, imputed
return (
pd.Series(parent1, index=person.index),
pd.Series(parent2, index=person.index),
pd.Series(imputed, index=person.index),
)
Loading