Strumieniowe kopiowanie i kompresja danych w Google Cloud Storage za pomocą Kotlina

· 403 słów · do przeczytania, w minut 2

Google Cloud Storage (GCS) to elastyczne, wydajne i ekonomiczne rozwiązanie do przechowywania danych w chmurze. Często zdarza się, że chcemy przenieść dane między różnymi kubełkami GCS.

CLI

Najprostszym do tego narzędziem, będzie dostarczany przez Google gsutil.

Rozpocznijmy od stworzenia źródłowego kubełka:

$ gsutil mb gs://mz-blogasek-src

Skopiujmy do niego testowy plik

$ gsutil cp 100B.txt gs://mz-blogasek-src

A teraz utwórzmy docelowy kubełek:

$ gsutil mb gs://mz-blogasek-dest

Nic nie stoi teraz na przeszkodzie, żeby skopiować przykładowy plik z jednego do drugiego kubełka:

$ gsutil cp gs://mz-blogasek-src/100B.txt gs://mz-blogasek-dest

Jeżeli naszym celem jest zsynchronizowania całych zawartości kubełków, lepszym rozwiązaniem będzie wykonanie komendy

$ gsutil rsync gs://mz-blogasek-src gs://mz-blogasek-dest

Kotlin

Użycie dostarczonej przez Google aplikacji gsutil jest w większości przypadków całkowicie wystarczające. Jednak co, jeśli podczas kopiowania plików chcielibyśmy je dodatkowo przetworzyć? A co, jeśli chcielibyśmy zautomatyzować cały proces? W takim przypadku przychodzą z pomocą Google Cloud Functions.

Usługa ta umożliwia nam wdrażanie funkcji, które reagują na określone zdarzenia. W naszym przypadku będzie to zdarzenie google.cloud.storage.object.v1.finalized, które występuje w określonym przez nas kubełku.

Funkcja napisana z wykorzystaniem Google Cloud SDK jest klasą, która implementuje CloudEventsFunction.

Interfejs ten posiada jedną metodę:

void accept(CloudEvent event) throws Exception;

Jej przykładowa implementacja, potrafiąca wydobyć szczegółowe informacje, niezbędne do wykonania interesujących nas operacji kopiowania i kompresji, może wyglądać jak poniżej:

override fun accept(event: CloudEvent?) {
    val storageObjectDataBuilder = StorageObjectData.newBuilder()
    JsonFormat.parser().merge(
        event?.data?.toBytes()?.decodeToString(),
        storageObjectDataBuilder)
    val storageObjectData = storageObjectDataBuilder.build()
}

Obiekt storageObjectData posiada wiele pól, w tym:

  • bucket – zawierający nazwę kubełka, do którego odnosi się zdarzenie,

  • name – z nazwą pliku, który pojawił się lub został zmieniony.

Dysponując tymi informacjami, możemy teraz napisać potrzebny kod. W języku Kotlin, korzystając z biblioteki Apache Commons Compress, może on wyglądać jak poniżej:

private fun compress(sourceBucketName: String, sourceFileName: String) {

Ustalamy docelowy kubełek, oraz nazwę pliku wynikowego:

val destinationBucketName = System.getenv(DESTINATION_BUCKET_ENV)
val destinationFileName = "${sourceFileName}.gz"

Tworzymy identyfikatory źródłowego oraz docelowego pliku:

val sourceBlobId = BlobId.of(sourceBucketName, sourceFileName)
val destinationBlobId = BlobId.of(destinationBucketName, destinationFileName)

Wykonujemy kopiowanie i kompresowanie właściwe:

Channels.newInputStream(storage.reader(sourceBlobId)).use { inputStream ->
    Channels.newOutputStream(
    storage.writer(
    BlobInfo.newBuilder(destinationBlobId).build())).use { outputStream ->
        GzipCompressorOutputStream(outputStream).use { gzipOutputStream ->
        val buffer = ByteArray(1024)
        var bytesRead: Int
        while (inputStream.read(buffer).also { bytesRead = it } != -1) {
            gzipOutputStream.write(buffer, 0, bytesRead)
        }
    }
    }
}

Po napisaniu funkcji, pozostaje już tylko jej wdrożenie na Google Cloud Platform (GCP). Można to zrobić, korzystając z linii poleceń:

$ gcloud functions deploy replicator_function
    --gen2
    --source=build/libs/
    --entry-point com.zamolski.storagereplicator.ReplicatorFunction
    --env-vars-file env-vars.yaml
    --runtime java17
    --memory=256MB
    --max-instances=5
    --region europe-central2
    --trigger-event-filters="type=google.cloud.storage.object.v1.finalized"
    --trigger-event-filters="bucket=mz-blogasek-src"

Kompletny kod dla powyższego przykładu znajduje się tutaj.