NotificationAggregationService.java
package com.hwhub.batch.application.service;
import com.hwhub.batch.domain.enums.NotificationLinkType;
import com.hwhub.batch.domain.enums.NotificationType;
import com.hwhub.batch.domain.enums.ProgramType;
import com.hwhub.batch.domain.model.HouseholdMemberModel;
import com.hwhub.batch.domain.model.HouseholdModel;
import com.hwhub.batch.domain.model.UserModel;
import com.hwhub.batch.domain.model.notification.NotificationLink;
import com.hwhub.batch.domain.model.notification.NotificationMessage;
import com.hwhub.batch.domain.model.notification.NotificationModel;
import com.hwhub.batch.domain.repository.HouseholdMemeberRepository;
import com.hwhub.batch.domain.repository.HouseholdMemeberRepository.HouseholdUserPair;
import com.hwhub.batch.domain.repository.HouseholdRepository;
import com.hwhub.batch.domain.repository.NotificationEventRepository;
import com.hwhub.batch.domain.repository.NotificationRepository;
import com.hwhub.batch.domain.repository.UserRepository;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@RequiredArgsConstructor
public class NotificationAggregationService {
private static final long SYSTEM_USER_ID = 2;
// 1回のバッチで処理する最大件数
private static final int CLAIM_LIMIT = 2000;
private final NotificationRepository repository;
private final NotificationEventRepository eventRepository;
private final HouseholdRepository householdRepository;
private final HouseholdMemeberRepository householdMemeberRepository;
private final UserRepository userRepository;
public record Result(int claimedCount, int notificationsInserted, int eventsDone) {}
public record HouseholdUserKey(Long householdId, Long userId) {}
/**
* タスク割当系通知の集約
*
* <p>流れ:
*
* <ol>
* <li>PENDING -> PROCESSING (processing_key付与)
* <li>task_id単位で最新だけ採用 → target/type/date/actorで集約結果を取得
* <li>Java側で通知文言/params/link を構築して t_notification に bulkInsert
* <li>PROCESSING -> DONE
* </ol>
*/
@Transactional
public Result aggregateTaskAssignmentNotifications() {
String processingKey = UUID.randomUUID().toString();
int claimed =
eventRepository.markPendingAsProcessing(
processingKey, CLAIM_LIMIT, SYSTEM_USER_ID, ProgramType.BTC_NTF_AGGR.getCode());
if (claimed == 0) {
return new Result(0, 0, 0);
}
// イベントを集約して取得
List<NotificationModel> rows =
eventRepository.aggregateLatestTaskAssignmentEvents(processingKey);
int inserted = 0;
if (!rows.isEmpty()) {
// 名称を取得
Map<HouseholdUserKey, String> actorNameMap = buildActorNameMap(rows);
Map<Long, String> householdNameMap = buildHouseholdNameMap(rows);
// メッセージと遷移先を設定
rows.forEach(m -> setMessageAndLink(m, actorNameMap, householdNameMap));
inserted = repository.bulkInsert(rows, SYSTEM_USER_ID, ProgramType.BTC_NTF_AGGR.getCode());
}
int done =
eventRepository.markProcessingAsDone(
processingKey, SYSTEM_USER_ID, ProgramType.BTC_NTF_AGGR.getCode());
return new Result(claimed, inserted, done);
}
/**
* 指定された通知モデルに遷移先とメッセージを設定する。
*
* @param model 通知モデル
* @param actorNameMap 操作者名マップ
* @param householdNameMap おうち名マップ
*/
private void setMessageAndLink(
NotificationModel model,
Map<HouseholdUserKey, String> actorNameMap,
Map<Long, String> householdNameMap) {
// 遷移先
NotificationLink link =
new NotificationLink(NotificationLinkType.MY_TASKS, model.getHouseholdId());
// i18n keyは notificationTypeから決める
String titleKey = resolveTitleKey(model.getNotificationType());
String bodyKey = resolveBodyKey(model.getNotificationType());
// params
Map<String, Object> params = new LinkedHashMap<>();
params.put("household", householdNameMap.get(model.getHouseholdId()));
params.put("date", model.getAggregatedKey());
params.put("count", model.getAggregatedCount());
params.put(
"actorName",
actorNameMap.get(new HouseholdUserKey(model.getHouseholdId(), model.getActorUserId())));
NotificationMessage message = new NotificationMessage(titleKey, bodyKey, params);
model.setMessageAndLink(message, link);
}
/**
* おうちの表示名を取得する。
*
* @param rows 通知モデルのリスト
* @return おうち名のマップ
*/
private Map<Long, String> buildHouseholdNameMap(List<NotificationModel> rows) {
List<Long> householdIds =
rows.stream().map(NotificationModel::getHouseholdId).distinct().toList();
if (householdIds.isEmpty()) {
return Map.of();
}
return householdRepository.findByIds(householdIds).stream()
.collect(Collectors.toMap(HouseholdModel::getHouseholdId, HouseholdModel::getName));
}
/**
* 操作者の表示名を取得する。
*
* <ol>
* <li>おうちの表示名を取得する
* <li>おうちの表示名がない(既におうちから離脱しているケース)場合は、ユーザーの表示名を取得する
* </ol>
*
* @param rows 通知モデルのリスト
* @return 操作者の表示名のマップ
*/
private Map<HouseholdUserKey, String> buildActorNameMap(List<NotificationModel> rows) {
// おうちとユーザーのペアを生成
List<HouseholdUserPair> pairs =
rows.stream()
.map(r -> new HouseholdUserPair(r.getHouseholdId(), r.getActorUserId()))
.distinct()
.toList();
if (pairs.isEmpty()) {
return Map.of();
}
Map<HouseholdUserKey, String> nameMap = new HashMap<>();
// おうちごとに設定している表示名を取得
List<HouseholdMemberModel> members =
householdMemeberRepository.findByHouseholdAndUserPairs(pairs);
for (HouseholdMemberModel m : members) {
if (m.getNickname() != null && !m.getNickname().isBlank()) {
nameMap.put(new HouseholdUserKey(m.getHouseholdId(), m.getUserId()), m.getNickname());
}
}
// おうちごとに設定している表示名が未設定(乃至は離脱している)のユーザのペアを生成
List<HouseholdUserPair> missingPairs =
pairs.stream()
.filter(p -> !nameMap.containsKey(new HouseholdUserKey(p.householdId(), p.userId())))
.toList();
if (missingPairs.isEmpty()) {
return nameMap;
}
// 世帯IDを無視してユーザIDでまとめる
List<Long> missingUserIds =
missingPairs.stream().map(HouseholdUserPair::userId).distinct().toList();
// ユーザ情報を取得しユーザID、表示名のMap生成
Map<Long, String> userNameMap =
userRepository.findByIds(missingUserIds).stream()
.filter(u -> u.getDisplayName() != null && !u.getDisplayName().isBlank())
.collect(
Collectors.toMap(UserModel::getUserId, UserModel::getDisplayName, (a, b) -> a));
for (HouseholdUserPair p : missingPairs) {
String name = userNameMap.get(p.userId());
if (name != null) {
nameMap.put(new HouseholdUserKey(p.householdId(), p.userId()), name);
} else {
// 最後のfallback(あり得ない)
nameMap.put(new HouseholdUserKey(p.householdId(), p.userId()), String.valueOf(p.userId()));
}
}
return nameMap;
}
/**
* 通知種別によってメッセージtitleを設定する。
*
* @param type 通知種別
* @return メッセージtitle
*/
private String resolveTitleKey(NotificationType type) {
return switch (type) {
case TASK_ASSIGNED -> "notifications.messages.taskAssigned.title";
case BE_DUMPED_TASK -> "notifications.messages.beDumpedTasks.title";
case YOUR_TASK_WAS_TAKEN -> "notifications.messages.yourTaskWasTaken.title";
default -> "notifications.messages.generic.title";
};
}
/**
* 通知種別によってメッセージbodyを設定する。
*
* @param type 通知種別
* @return メッセージbody
*/
private String resolveBodyKey(NotificationType type) {
return switch (type) {
case TASK_ASSIGNED -> "notifications.messages.taskAssigned.body";
case BE_DUMPED_TASK -> "notifications.messages.beDumpedTasks.body";
case YOUR_TASK_WAS_TAKEN -> "notifications.messages.yourTaskWasTaken.body";
default -> "notifications.messages.generic.body";
};
}
}