ScyllaDB는 Cassandra의 csql과 호환되도록 만들어졌기 때문에 Cassandra 클라이언트를 사용해 통신 가능하다. 이번 글에서는 Spring Data Cassandra를 통해 ScyllaDB의 데이터를 커서 기반으로 페이징하는 방법을 알아본다.
개념
Cassandra는 OFFSET 기능을 지원하지 않고, LIMIT과 PagingState를 활용한 Cursor-based Pagination을 제공한다. 이 때 Cursor 역할을 하는 데이터를 PagingState라고 부른다. Cassandra는 현재까지 읽은 위치 정보를 담은 PagingState를 클라이언트에게 넘긴다. 클라이언트는 다음 페이지를 요청할 때 PagingState를 page size와 함께 보내 이전에 읽은 위치 이후부터 데이터를 조회할 수 있다.
Spring Data Cassandra에서 페이지네이션하기 위해 제공하는 대표적인 클래스는 CassandraPageRequest 이다. CassandraPageRequest는 PageRequest를 상속받은 Pageable 구현체로, 일반적인 상황과 유사하게 page number, page size 등의 인자를 받아 데이터를 페이지네이션할 수 있다. 첫 번째 page가 아닌 두 번째 이상의 page에 접근하려 한다면 pagingState가 반드시 필요하다.
Spring Data Cassandra에서는 PagingState를 ByteBuffer 타입으로 주고받는다. 추가로, page number 값은 pagingState로 대체 가능하므로, 사용할 필요가 없다.
public static CassandraPageRequest of(Pageable current, @Nullable ByteBuffer pagingState) {
return new CassandraPageRequest(current.getPageNumber(), current.getPageSize(), current.getSort(),
pagingState != null ? CassandraScrollPosition.of(pagingState) : CassandraScrollPosition.initial(),
pagingState != null);
}
구현
이제 구체적으로 Controller, Service, Repository, Entity 레벨을 어떻게 구현해야 하는지에 대해 살펴보겠다.
특정 사용자가 작성한 피드(게시글)를 조회하기 위한 프로그램을 구현해보자. 이를 위해 설계된 데이터 구조는 다음과 같다. feeds_by_id는 feed에 대한 구체적인 정보를 가지고 있는 테이블이다. feeds_by_user는 피드의 생성자, 생성일, 아이디만 담으며 생성일을 기준으로 정렬되어 있는 테이블이다.
Cassandra는 Secondary Index 사용이 제한적이고 파티션 키 내에서만 정렬이 가능하므로 쿼리와 테이블을 1:1로 설계하는 것이 일반적이다. 만약 피드 아이디를 기준으로 피드를 조회한다고 하면, feeds_by_id 테이블에 질의를 보내면 된다. 특정 사용자가 작성한 피드를 최신순으로 보고 싶다면, feeds_by_user 테이블에 질의를 보내 피드 아이디 목록을 구한 후 구체적인 피드 정보는 다시 feeds_by_id에 질의를 보내면 된다.
CREATE TABLE IF NOT EXISTS feed_app.feeds_by_id (
feed_id text,
username text,
created_at timestamp,
updated_at timestamp
content text,
PRIMARY KEY (feed_id)
);
CREATE TABLE IF NOT EXISTS feed_app.feeds_by_user (
username text,
created_at timestamp,
feed_id text,
PRIMARY KEY (username, created_at, feed_id)
) WITH CLUSTERING ORDER BY (created_at DESC);
Repository
- Configuration 파일에 @EnableReactiveCassandraRepositories 어노테이션을 추가해주고 keyspace를 명시해주어야 Repository 파일들이 정상 동작한다.
@Configuration
@EnableReactiveCassandraRepositories
@EnableReactiveCassandraAuditing
class DBConfig : AbstractReactiveCassandraConfiguration() {
override fun getKeyspaceName(): String {
return "feed_app"
}
}
- 두 테이블에 대한 Repsitory를 각각 추가하고 필요한 메서드를 정의한다.
interface FeedByUserRepository: ReactiveCrudRepository<FeedByUser, FeedKey> {
fun findAllByKeyUsername(username: String, pageRequest: CassandraPageRequest): Mono<Slice<FeedByUser>>
}
interface FeedDBRepository : ReactiveCrudRepository<Feed, String> {
fun findByFeedId(id: String): Mono<Feed>
}
Service
- 앞서 언급했듯, feeds_by_user 테이블에 질의를 보내 피드 아이디 목록을 구한 후 구체적인 피드 정보는 다시 feeds_by_id에 질의를 보내는 로직을 수행한다.
fun getFeedsByUser(username: String, pageRequest: CassandraPageRequest): Mono<Slice<Feed>> {
return feedByUserRepository.findAllByKeyUsername(username, pageRequest)
.flatMap { slice ->
Flux.fromIterable(slice.content)
.flatMap { feedByUser -> feedDBRepository.findById(feedByUser.key.feedId) }
.collectList()
.map { feeds -> SliceImpl(feeds, slice.pageable, slice.hasNext()) }
}
}
Controller
- 사용자로부터 요청을 받아 PageRequest를 생성하고, Service 계층으로부터 응답이 오면 PageResponse를 생성해주는 로직을 수행한다.
- 이 때 PagingState 값을 HTTP body 상에는 String 타입으로 사용하고 스프링 내부에서는 ByteBuffer 타입을 사용하므로 둘 사이의 변환이 중요하다. 바이너리 타입이므로 String으로 바로 변환하면 깨지기 때문에, Base64를 통해 String 타입으로 변환해야 한다.
- 중요한 점은 HTTP URL에 pagingState를 담도록 정의했기 때문에 Base64의 Encoder, Decoder를 얻을 때 URLEncoder, URLDecoder를 사용해야 한다는 점이다. 만약 그렇지 않다면 URL에서 특수한 의미를 담는 문자(+, /) 가 인코딩 문자열에 섞여 들어가 예외가 발생한다.
@GetMapping("/{username}")
fun getUserFeeds(
@PathVariable username: String,
@RequestParam(defaultValue = "10") size: Int,
@RequestParam(required = false) pagingState: String?
): Mono<PagedFeedResponse> {
val pageRequest = createPageRequest(size, pagingState)
return feedService.getFeedsByUser(username, pageRequest)
.map { slice ->
createPageResponse(slice)
}
}
private fun createPageResponse(slice: Slice<Feed>): PagedFeedResponse =
if (slice.nextPageable().isUnpaged) {
PagedFeedResponse(
feeds = slice.content.map { toFeedResponse(it) },
size = slice.size,
nextPageable = null
)
} else {
val nextPagingState = (slice.nextPageable() as CassandraPageRequest)
.pagingState?.let { buf ->
val bytes = ByteArray(buf.remaining())
buf.get(bytes)
Base64.getUrlEncoder().encodeToString(bytes)
}
PagedFeedResponse(
feeds = slice.content.map { toFeedResponse(it) },
size = slice.size,
nextPageable = nextPagingState
)
}
private fun createPageRequest(
size: Int,
pagingState: String?
): CassandraPageRequest {
val pageRequest = pagingState?.let {
CassandraPageRequest.of(
PageRequest.ofSize(size),
ByteBuffer.wrap(Base64.getUrlDecoder().decode(pagingState))
)
}
?: CassandraPageRequest.first(size)
return pageRequest
}
사용해보기
첫 페이지를 조회하려는 경우 요청 시 page size만 함께 전달하면 된다.

다음 페이지를 조회하려는 경우, 이전 페이지의 응답으로 왔던 pagingState를 요청에 담아 보내면 된다.

마치며
Spring Data Cassandra에서는 Cassandra Java Driver 4 버전을 도입하면서 PagingState를 ByteBuffer 타입으로 변경하였다. 아마 Cassandra Java Driver 4 버전 초기에 PagingState 클래스가 없었으므로 제거시켜버린 것 같은데, 다시 4 버전에도 PagingState 클래스가 생겼으니 이를 지원해주었으면 좋겠는 바람이다.
'Spring Boot > 개발 기록' 카테고리의 다른 글
| Spring Cacheable의 동작 원리 (0) | 2024.09.22 |
|---|---|
| Spring Boot 무중단 배포하기 (AWS ec2 + Nginx + Github self -hosted runner) (0) | 2024.03.11 |
| Spring Boot와 Spring Security 프로젝트에 Swagger 적용하기 (0) | 2022.06.10 |
| [Spring Security] JWT Tutorial (5) 회원가입, 권한 검증 (0) | 2022.04.28 |
| [Spring Security] JWT Tutorial (4) Repository 생성, 로그인 (0) | 2022.04.28 |
댓글