• Saltar a la navegación principal
  • Saltar al contenido principal
  • Saltar al pie de página
Bluetab

Bluetab

an IBM Company

  • Soluciones
    • DATA STRATEGY
    • DATA FABRIC
    • AUGMENTED ANALYTICS
  • Assets
    • TRUEDAT
    • FASTCAPTURE
  • Conócenos
  • Oficinas
    • España
    • Mexico
    • Perú
    • Colombia
  • talento
    • España
    • TALENT HUB ALICANTE
    • TALENT HUB BIZKAIA
    • TALENT HUB BARCELONA
  • Blog
  • ES
    • EN

Practices

LakeHouse Streaming en AWS con Apache Flink y Hudi (Parte 2)

octubre 4, 2023 by Bluetab

LakeHouse Streaming en AWS con Apache Flink y Hudi (Parte 2)

Alberto Jaen

AWS Cloud Engineer

Alfonso Jerez

AWS Cloud Engineer

Adrián Jiménez

AWS Cloud Engineer

Introducción

Este artículo es el segundo en una serie de publicaciones que se centran en la creación de un LakeHouse con Hudi a partir de una ingesta en streaming procesada por una aplicación Flink. El primer artículo se centra en sentar una buena base para esta plataforma, donde se desplegaron unas aplicaciones Flink con KDA (Kinesis Data Analytics) para cada tipo de formato (MoR, CoW para Hudi y JSON) que escriben el resultado de este procesamiento en unos buckets.

El envío de datos que se utiliza como input se mandaba en el anterior artículo desde una máquina en local ejecutando una aplicación de Locust, lo que puede presentar problemas a la hora de escalar y querer procesar un volumen alto de eventos. Además, las aplicaciones de Kinesis Data Analytics con Flink presentan problemas de agilidad en su modo de autoescalado. Todos estos nuevos retos serán resueltos en este artículo.

También se catalogarán estas tablas en Glue, servicio que disponibiliza un catálogo de datos en AWS, para poder acceder a estos y así realizar queries de todo tipo. Como motor de queries que consumirá estos metadatos se utilizará Athena, que proporciona una experiencia escalable, ágil y serverless para poder ejecutar queries con SQL o Spark para nuestras tablas alojadas en S3.

Por otro lado, en este artículo también se han desplegado los componentes necesarios para poder monitorizar nuestras aplicaciones y extraer así conclusiones sobre la velocidad a la que se ingestan los datos y los posibles problemas a resolver para que el procesamiento tenga la latencia requerida según los requisitos que se impongan.

Finalmente se realizará una comparativa en cuanto a rendimiento y latencia de las diferentes aplicaciones de Flink que escriben datos en los formatos de Hudi y JSON para así poder ver las diferentes ventajas e inconvenientes de estos formatos. 

Arquitectura

A continuación se puede ver la arquitectura a alto nivel que se desplegará:

Para un mayor entendimiento vamos a explicarla de izquierda a derecha. Como se puede observar, el cambio más reseñable con respecto al primer artículo es la inclusión de un cluster de Kubernetes para poder escalar los eventos que serán mandados como input de nuestra aplicación de streaming. De esta manera se podrá testear de manera exhaustiva el rendimiento de las aplicaciones de Flink dependiendo de su aprovisionamiento y sobre todo del tipo de formato y tabla en el que escriben al LakeHouse. Además, se ha disponibilizado un ALB (Application Load Balancer) que permite acceder a la interfaz de Locust para poder definir el número de usuarios a simular y cómo deben escalar estos con el tiempo. La URL para acceder a esta aparecerá como output al desplegar la infraestructura con Terraform.

Por otro lado se han realizado cambios reseñables en las aplicaciones KDA de Flink y el stream del que leen estas. Cada aplicación lee ahora como consumidores EFO (Enhanced Fan Out), de tal manera que cada una de ellas tiene un ancho de banda dedicado. La razón de este cambio y sus detalles serán explicados más en detalle en el apartado dedicado para Kinesis.

En cuanto a la monitorización y la extracción de métricas en NRT (Near Real Time) se han desplegado unas funciones lambdas que acceden a las tablas apoyándose en Athena gracias a haber registrado los metadatos de estas en el catálogo de Glue. Es importante resaltar que los metadatos de las tablas de Hudi son registrados en Glue por Flink pero en el caso de JSON se despliega un crawler que registra estas tablas en el catálogo. Este crawler se debe ejecutar manualmente para que esta tabla quede registrada en Glue.

Escalado

Kinesis Stream

Dado que el objetivo es someter la aplicación a una carga considerable de eventos por segundo, es necesario explicar cómo cada una de las piezas de la arquitectura pueden escalar de acuerdo al volumen de datos.

Como hemos comentado previamente, se ha optado por un Kinesis Stream On-Demand para automatizar el escalado de los shards durante las pruebas de carga. Es necesario tener en cuenta que estos streams pueden acomodar una tasa de escritura de hasta el 200% de lo especificado por el número de shards en un momento dado.

Una vez que el stream se encuentra por encima del 100%, aumentará automáticamente el número de shards en un plazo de 15 minutos. La única limitación por tanto es no superar el doble del volumen de escritura admitido en menos de dicho periodo.

Por otro lado, dado que se tendrán tres aplicaciones de Flink leyendo del mismo stream, las limitaciones a nivel de lectura serán el mayor problema. Un Kinesis Stream solo admite 5 llamadas GetRecord por shard por segundo. Dado que cada aplicación tiene que leer todo el stream (y por lo tanto, todos los shards), aumentar el número de shards no ayuda a solventar este problema.

La solución pasa por registrar cada una de las aplicaciones como un consumidor Enhanced Fan-Out. Esta funcionalidad de los Kinesis Stream provee a cada uno de estos consumidores con un límite individual de 5 llamadas GetRecord y 2MB por shard por segundo de lectura.

Esta configuración se realiza en el lado del consumidor, en nuestro caso mediante el conector de Kinesis para Flink:

'scan.stream.recordpublisher' = 'EFO',
'scan.stream.efo.registration' = 'EAGER/LAZY',
'scan.stream.efo.consumername' = '{consumer_name}' 

Conviene mencionar que alternativamente, es posible aumentar la latencia de lectura de nuestras aplicaciones de Flink. Por defecto Flink realiza una lectura cada 200 ms por shard, de modo que una aplicación consume completamente la cuota de lectura de un stream. Incrementando este valor a 600ms podríamos acomodar las tres aplicaciones, a costa de una mayor latencia:

scan.shard.getrecords.intervalmillis = '600' 

También se hará uso de la opción Adaptive Reads, que modifica dinámicamente el número de eventos recogidos por llamada en función del tamaño de cada record. Esto permite aprovechar los 2 MB/s por shard disponibles para cada consumidor: 

'scan.shard.adaptivereads' = 'true' 

En lo que respecta al escalado en KPUs (Kinesis Processing Unit) de Flink, se ha optado por no hacer uso del autoescalado, ya que cada proceso de escalado incurren en downtime para la aplicación. Debido a los diferentes requerimientos de cada una de las aplicaciones, las acciones de escalado en momentos inesperados podrían interrumpir las pruebas de carga. Además es interesante medir el rendimiento de escritura de cada una de las aplicaciones en igualdad de capacidad de computación.

Hudi

Timeline

Uno de los sistemas base sobre la que se sustenta el funcionamiento y características de Hudi es la timeline. Hudi guarda un registro temporal de todas las acciones que se han realizado sobre la tabla, así como el estado de esta acción.

Las principales acciones que componen la timeline son

  • Commits – escritura atómica de un conjunto de registros en la tabla en formato columnar
  • Delta Commit – similar al commit, representa una escritura de registros en forma de logs en una tabla Merge on Read
  • Compaction – compactación de las escrituras en logs (delta commits) de una tabla MoR a formato columnar
  • Cleans – borrado de versiones antiguas de archivos
  • Rollback – eliminado de los registros escritos por un commit o delta commit fallido
  • Savepoint – marca un conjunto de archivos como “guardados” para que no sean eliminados por el proceso de limpieza. Permite restaurar la tabla a un punto anterior en la timeline

Cualquiera de estas acciones pueden encontrarse en uno de estos tres estados

  1. Requested – una acción ha sido planeada sin iniciar
  2. Inflight – la acción está en proceso
  3. Completed – denota que la acción ha sido completada


Tipos de tabla

Como se ha dejado entrever en el funcionamiento de la timeline de Hudi, existen dos tipos de escritura soportados: columnar y logs. El formato columnar (parquet) constituye la forma final de una tabla de Hudi, junto con los metadatos de la timeline. Sin embargo, es posible hacer uso de las escrituras en logs (avro) para disminuir la latencia de escritura y eventualmente compactarse a formato columnar sin entorpecer la escritura.

El uso de estos métodos de escritura dan lugar a los dos tipos de tabla que Hudi pone a nuestra disposición

  • Copy on Write – las escrituras se realizan exclusivamente en formato columnar, creando un nuevo fichero con los nuevos registros de la tabla. Los datos están disponibles inmediatamente pero incurre en mayor latencia de escritura
  • Merge on Read – hace uso de la escritura en logs. Los nuevos registros son inicialmente escritos como logs, y posteriormente serán transformados a formato columnar por el proceso de compactación. Obtenemos menor latencia de escritura a costa de latencia de lectura; los nuevos registros no estarán disponibles hasta que se realice la compactación.

Tipos de Query

Para poder aprovechar las características de cada tipo de tabla, existen tres tipos de queries que se pueden realizar sobre una tabla de Hudi

  • Snapshot – obtiene la última versión de la tabla. Para las tablas MoR esto implica incurrir en un proceso de compactación para obtener los últimos registros en formato log. 
  • Read Optimized – para tablas MoR, lee sólamente los registros ya expuestos en formato columnar sin incurrir en latencia de lectura adicional.
  • Incremental – recoge únicamente los nuevos registros desde un cierto commit o compactación, facilitando la creación de pipelines incrementales. No está soportada por Athena

Integración con Glue Catalog


El conector de Hudi permite una integración nativa con el catálogo de Glue en AWS. Basta con añadir las dependencias de Hive en nuestra aplicación de Flink:

com.amazonaws.aws-java-sdk-glue
org.apache.hive.hive-common
org.apache.hive.hive-exec 

Y especificar la configuración del catálogo en el conector de Hudi:

'hive_sync.enable' = 'true',
'hive_sync.db' = '{glue_database}',
'hive_sync.table' = '{table_name}',
'hive_sync.partition_fields' = '{partition_fields}',
'hive_sync.mode' = 'glue',
'hive_sync.use_jdbc' = 'false' 

Con esta integración, la aplicación creará automáticamente las tablas en el catálogo. Como hemos mencionado anteriormente, existen distintos tipos de query para consultar una tabla de Hudi. Se crearán por tanto en el catálogo distintas tablas para soportar las diferentes consultas.

Para una tabla CoW, la tabla se consultará mediante una query Snapshot. Para MoR en cambio se pondrán a disposición dos tablas, para soportar consultas Read Optimized o Snapshot.

La principal aplicación de Glue es de soporte a las lambdas para que al ejecutar las queries mediante Athena su ejecución pueda realizarse de una forma más eficiente, rápida y segura:

  • Glue Catalog: almacenamiento centralizado de la información acerca de la organización, diseño y formato de los datos, utilizado por Athena para realizar directamente las consultas a S3 sin necesidad de tener que apoyarse en terceros para conseguir esta información
  • Automatización del Esquema: Glue rastrea y cataloga automáticamente los datos en S3, detectando y adaptando los cambios en el esquema. Esto evita posibles errores y permite la lectura de los nuevos campos en caso de que se produzcan alteraciones en los esquemas de los eventos

Configuración de Hudi

Es importante entender las configuraciones que nos ofrece Hudi para optimizar nuestra aplicación, en particular para una aplicación en Near Real Time conviene estar al tanto de las opciones disponibles. Aunque la capacidad de configuración es inmensa [1], se intentará sintetizar las que pueden ser más relevantes para una primera toma de contacto con esta tecnología.

Particionado

Apache Hudi ofrece los tipos de particionado que pueden encontrarse en otras soluciones, se detallarán las principales y se justificara la implementada:

  • Simple: particionado basado en un único campo, en este caso el campo escogido es ‘ticker’ ya que se ha identificado que es el que tiene una cardinalidad menor.
  • Particionado Compuesto: particionamiento basado en múltiples campos, podría resultar interesante escoger un campo de baja cardinalidad (ticker) y otro de cardinalidad media (fecha)
  • Particionado Dinámico: elección de la variable en base de los valores, puede resultar interesante cuando la cardinalidad de las variables puede sufrir variaciones y se quiera una actualización del particionamiento de una forma automática y flexible.

Índices

Apache Hudi cuenta con una múltiples  tipos de indexación[2], comentaremos brevemente los más comunes:

  • Bloom Index – Hace uso de un bloom filter sobre la key de los eventos, adicionalmente se puede complementar con un filtrado por rango de de key. Funciona bien cuando tratamos con una tabla donde la mayoría de cambios ocurren en las particiones más recientes o para deduplicado de eventos.
  • Simple: indexación realizada mediante la combinación de FileID y RecordKey. Recomendado cuando las operaciones Upsert no son tan frecuentes debido a la simplicidad que este ofrece.

Ambos tipos de índices pueden ser usados en su forma global

  • Índice global – Imponen la unicidad de las keys en todas las particiones de la tabla, es decir, garantizan que existirá sólamente un registro con una cierta key.
  • Índice no global – La unicidad de la key sólo es exigida a nivel de partición. Si los datos son consistentes y una key sólo va a existir en una partición, este tipo de índices ofrecen un rendimiento mucho mayor y mejor escalado.

En este caso, se ha optado por un Bloom Index, el cual es el que se toma por defecto en caso de que no se declare expresamente:

"hoodie.index.type" = "BLOOM" 

La elección de este tipo de indexación se debe a que los casos de uso que se han planteado requieren de un procesamiento de datos considerablemente alto y eficiente.

Tipos de operación

Apache Hudi ofrece varios tipos de operaciones[3] que permiten a los usuarios administrar y modificar conjuntos de datos de gran tamaño. A continuación se detallan tanto las principales operaciones realizadas en los Stress Tests como en otros escenarios:

  • Upsert – Es la operación por defecto, y ejecutará un insert o un update dependiendo de si el registro ya existe tras una búsqueda en el índice. Con esta operación la tabla no tendrá duplicados para su clave primaria.
  • Insert – Esta operación ignora la búsqueda en el índice a la hora de insertar eventos. Es la más rápida pero la tabla puede contener duplicados. Aún así es útil si se utilizan métodos auxiliares  de deduplicado, o simplemente la existencia de estos es tolerable en el caso de uso.
  • Delete: Hudi ofrece dos métodos de borrado. Soft Delete convierte a nulos los valores del evento a excepción de la key. Hard Delete ejecuta un borrado físico del evento en la tabla.
  • Bulk Insert Operación similar al Insert pero optimizada para la inserción de un gran volumen de datos, a costa de sacrificar ciertas garantías en el control del tamaño de ficheros. Escala bien para cientos de TBs en caso de bootstrap inicial de una tabla de gran tamaño.

Compactación

En el caso de usar una tabla MoR es posible configurar el ritmo de compactación de logs en parquet para buscar el equilibrio entre latencia de escritura y lectura que más convenga al caso de uso. Se pueden especificar una estrategia de tiempo o número de delta commits (o ambos) que ejecutan un proceso de compactación:

compaction.delta_commits
compaction.delta_seconds
compaction.trigger.strategy 

Acciones asíncronas

Ciertas acciones de la timeline como la compactación, limpieza, archivado y clustering pueden ser realizadas asíncronamente por la aplicación, o incluso ser relegadas a procesos auxiliares a la aplicación de escritura. Para el caso de Flink, puede ayudar a mejorar la latencia de escritura y evitar problemas de BackPressure en la aplicación:

compaction.async.enabled
hoodie.clean.async
hoodie.archive.async
hoodie.clustering.async.enabled 

Stress Tests & Insights

Al desplegar las aplicaciones, se ha procedido a realizar distintos tests variando tanto la carga máxima de eventos como la concurrencia y el grado exponencial de crecimiento de los mismos. Esto ha sido posible  gracias a la flexibilidad ofrecida por Locust al estar levantado sobre un cluster de Kubernetes, pudiendo establecer un límite máximo de concurrencia de eventos y un incremental de los mismos. En los tests se ha establecido un límite máximo de 5 a 15K usuarios simultáneos (Peak Concurrency) escalando la frecuencia de los mismos de forma lineal, desde 5 a 20 usuarios más por segundo (Spawn Rate):

Se ha procedido a monitorizar los distintos test para así sacar conclusiones del rendimiento teniendo en cuenta las características específicas de cada uno de los formatos. Las métricas en las que se han apoyado los análisis son tanto las nativas de CloudWatch Metrics (CPU & Memory Utilization, KPUs, LastCheckpoint SIze & Duration,..), como las métricas obtenidas a partir de las Lambdas que periódicamente consultan el número de eventos disponibles en los buckets y realizan cálculos del promedio de la latencia de los mismos.


Número de Eventos

A la hora de analizar el número total de eventos procesados, los cuales son enviados de forma gradual, es decir, a medida que pasa el tiempo cada vez son más los eventos que se envían por segundo, se identifica una tendencia bastante similar aunque destacan JSON y Hudi MoR sobre Hudi CoW en cuanto a la rendimiento. Cabe destacar que JSON muestra un crecimiento más estable y constante en comparación con Hudi MoR y CoW y esto se debe a que estos últimos son capaces de manejar actualizaciones incrementales en los datos.

La similitud entre JSON y Hudi MoR hace que la elección se base completamente en las características del proyecto. En caso de que los datos no sean actualizados JSON puede resultar una solución más interesante debido principalmente a su simplicidad, mientras que si hay una alta frecuencia de actualización de datos históricos, Hudi MoR puede ser una mejor solución. Esto se debe tanto a la mayor eficiencia en las tareas de lectura como por la posibilidad de registrar las distintas versiones de los datos.

 

Latencia

Debido a la dificultad de estandarizar la lógica del cálculo de la latencia entre 3 tipos de almacenamiento distintos, se ha optado por simplificarla calculandolo como la diferencia entre la hora de creación del evento y la del procesamiento en la respectiva aplicación.

Se observa un comportamiento similar entre JSON y Hudi MoR, aunque este primero de una forma más crítica, al tener una latencia inicial muy baja pero a medida que tanto el tiempo de procesamiento como el volumen de carga aumenta, esta latencia se ve negativamente afectada.

La elección entre JSON y Hudi MoR dependerá tanto de la tolerancia de fallo que tenga la aplicación como las propias características de cada uno de los formatos, en caso de que la estructura de los datos sea estable y no cambie con frecuencia,o bien, no dependa de actualizaciones incrementales y pueda lidiar con reescrituras completas, en ese caso JSON puede que sea una mejor opción.

La elección de Hudi CoW sobre MoR puede darse cuando se necesite una alta tolerancia a errores y una alta capacidad de recuperación de eventos de escritura fallidos o corrompidos.`


Uso de CPU

Al analizar el uso de CPU, se ha identificado cierta homogeneidad entre los distintos tests aun trabajando con distintas cargas de trabajo. JSON Y Hudi MoR destacan por tener los niveles de uso de CPU más bajos, ambos por distintos motivos. JSON destaca por la simplicidad al incluir directamente los nuevos datos sin necesidad de tener que lidiar con versionado de datos, mientras que MoR no consume tanta CPU ya que por sus características, el consumo mayor de CPU se hace al realizar consultas de lectura, en las tareas de escritura únicamente identifica los cambios que serán aplicados al consultarlos.

Recordar que las métricas nativas de CloudWatch únicamente nos permiten monitorizar las aplicaciones, que corresponden a las tareas de escritura. La monitorización de las tareas de lectura corresponde a las Lambdas mencionadas anteriormente. 

En este caso MoR es más beneficioso respecto a CoW, dado que el mayor consumo de CPU en MoR se produce al consultar los datos almacenados mientras que en CoW tiene lugar al actualizar los datos.

La elección entre los formatos más eficientes se deben a las necesidades del proyecto, en caso de que se requiera una mayor tolerancia al fallo, versionado de los datos y una mayor eficiencia de lectura, se optara por MoR frente a JSON, entre los dos formatos de Hudi, de nuevo, la elección dependerá de las características del proyecto, en caso de que las consultas requieran transformaciones pesadas y/o complejas se optaría por MoR, si en cambio, el proyecto requiera de una mayor integridad de datos y/o la ingesta de datos sea en batch,  resultaría más interesante CoW debido a que al trabajar con esos volúmenes de datos, el contar con copias de seguridad, en caso de surgir errores, el impacto en término de costes y tiempo de recuperación es menor.

 

Memory Utilization

JSON de nuevo destaca por tener los valores de uso de memoria más bajos aunque para la operativa de transformaciones que se realizan son relativamente altos y más teniendo en cuenta que no tiene que lidiar con la administración de versiones o la combinación de datos. Estos valores se deben a que no tiene capacidades de compresión optimizadas ni manejo eficiente de esquemas.

Respecto a Hudi, se pueden obtener unas conclusiones similares a las del apartado de uso de CPU, MoR tiene una utilización de memoria mayor que JSON debido al procesamiento de logs delta y la administración de versiones y una menor a CoW ya que la consolidación real de los datos no ocurre durante la escritura.


Last Checkpoint Size

Destacar, nuevamente, la estabilidad de JSON frente a las aplicaciones Hudi, ya que no solo muestra en los test realizados un valor inferior a ambos, si no una estabilidad que no se consigue ni con MoR ni CoW, ya que como puede apreciarse, al monitorear el tamaño de los Checkpoints, se percibe una volatilidad considerable.

La volatilidad percibida en las aplicaciones Hudi se debe principalmente a fallos surgidos en Checkpoints lo que conlleva que el Checkpoint posterior al fallido, tenga un volumen mayor. Además de esto, la volatilidad en los tamaños de los Checkpoints puede estar relacionado con las operaciones de optimización y compactación realizadas internamente que puede conllevar la compactación del estado y que esto reduzca considerablemente el tamaño del mismo.

Desafíos en el desarrollo

Read Throughput de Kinesis y EFO

Para no sobrepasar el límite de lectura sobre el Kinesis Stream se ha optado por suscribir los consumidores como Enhanced Fan-Out. En algunas pruebas en conjunto con Autoscaling esto ha dado problemas con el conector de Kinesis de Flink siendo incapaces de cerrar conexiones a la hora de escalar el cluster.


Configuración de Hudi

La configuración de Hudi ha sido otro de los puntos de fricción durante el desarrollo. Bajo cargas elevadas los procesos de compactación y limpieza son más propensos a causar problemas de Backpressure y causar errores en la aplicación. Aunque configurar estos procesos para que ocurran de forma asíncrona puede aliviar este problema, pueden surgir conflictos y desalineación entre procesos bajo cargas elevadas. Un equilibrio entre estas configuraciones y la capacidad del cluster de la aplicación son claves para el buen funcionamiento de la aplicación.

Heterogeneidad de formato

Al hacer un análisis del rendimiento de las 3 aplicaciones, se cuenta con una dificultad adicional debido a la naturaleza de los tipos de formato, teniendo esto tanto un impacto a la hora de plantear la arquitectura como en el planteamiento de las lógicas.
El distinto comportamiento de los formatos en la ingesta, complica el desarrollo de las lógicas a la hora de calcular la latencia. MoR escribe en logs previa compactación, por lo que los datos no están disponibles inmediatamente como ocurre con CoW o JSON.  Esto implica que la métrica común medible para todos los formatos es la de disponibilidad de lectura, la cual no es el principal objetivo de una tabla MoR.  


Sincronización con el Glue Catalog

Una de las grandes ventajas que nos hemos encontrado con Hudi es su capacidad para sincronizarse con el catálogo de Glue, creando las tablas y manteniéndose actualizadas sin necesidad de un crawler. Esto permite una aplicación y arquitectura más limpia que para el caso de JSON, para el cual debe ejecutarse manualmente al desplegar las aplicaciones.

Conclusiones

Los resultados de los tests muestran diferencias considerables entre los formatos JSON, Hudi MoR y CoW en términos de eficiencia, capacidad de respuesta y utilización de recursos. Se procede a analizar cada uno de los aspectos más en detalle:

  • Eficiencia de Procesamiento: JSON y Hudi MoR destacan en la mayoría de las métricas, mostrando un desempeño óptimo en términos de Latencia, CPU & Memory Utilization. Sin embargo, el comportamiento de JSON es más estable y predecible, aunque MoR cuente con ventajas sobre JSON, como por ejemplo, en la gestión de actualizaciones incrementales.
  • Resiliencia y Tolerancia a Fallos: la tolerancia a fallos es un factor muy importante en la decisión sobre la elección entre Hudi y JSON. En el caso de  MoR y CoW, dependerá del grado de criticidad, ya que a nivel general el rendimiento en tareas de escritura para MoR es superior.
  • Uso de Recursos: JSON se muestra como el más ligero, con baja utilización de CPU y memoria, debido a su simplicidad inherente. Mientras que Hudi MoR y CoW, por la naturaleza de su diseño y gestión de datos, requieren más recursos, especialmente en operaciones que involucran el manejo de versiones y la compactación de datos.

Para finalizar, resulta interesante identificar en quéque casos de uso o proyectos puede resultar más recomendable cada uno de los formatos en función de las características de los mismos y las red flags que puedan establecerse:

  • JSON: Recomendado para aplicaciones con estructuras de datos estables que no requieren actualizaciones incrementales y donde la simplicidad y la estabilidad son clave.
  • Hudi MoR: Adecuado para proyectos que requieren una gestión eficiente de actualizaciones incrementales y donde la latencia y la eficiencia en la escritura son cruciales.
  • Hudi CoW: Ideal para contextos donde la integridad de los datos es esencial, y se necesita una robusta recuperación de errores, especialmente en escenarios de ingestas en batch. 

Referencias

[1] Configuraciones Tablas Hudi. [link]

[2] Tipos de Indexacion Hudi. [link]

[3] Tipos de Operaciones Hudi. [link]

Autores

Alberto Jaen

AWS Cloud Engineer

Empecé mi carrera laboral con el desarrollo, mantenimiento y administración de bases de datos multidimensionales y Data Lakes. A partir de ahí comencé a estar interesado en plataformas de datos y arquitecturas cloud, estando certificado 3 veces en AWS y 2 con Hashicorp.

Actualmente me encuentro trabajando como un Cloud Engineer desarrollando Data Lakes y DataWarehouses con AWS para un cliente relacionado con la organización de eventos deportivos a nivel mundial.

Alfonso Jerez

AWS Cloud Engineer

Apasionado de los datos y las nuevas tecnologías, especializado como AWS Cloud Engineer en la optimización de DataWarehouses y procesos de ingesta y transformación de Data Lakes. Motivado por la mejora continua y automatización de la integración de servicios.

Colaborando activamente con el grupo de Práctica Cloud en investigaciones y desarrollo de blogs de tecnologías punteras e innovadoras tales como esta, fomentando así el continuo aprendizaje.

Adrián Jiménez

AWS Cloud Engineer

Dedicado al aprendizaje constante de nuevas tecnologías y su aplicación, disfrutando de utilizarlas en la resolución de desafíos tecnológicos. Desarrollo mi carrera como Cloud Engineer diseñando, implementando y manteniendo infraestructura en AWS.

Colaboro activamente en la Práctica Cloud, donde investigamos y experimentamos con nuevas tecnologías, buscando soluciones para los retos que enfrentan nuestros clientes.

Navegación

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?
DESCUBRE BLUETAB

SOLUCIONES, SOMOS EXPERTOS

DATA STRATEGY
DATA FABRIC
AUGMENTED ANALYTICS

Te puede interesar

Big Data e IoT

febrero 10, 2021
LEER MÁS

Data-Drive Agriculture; Big Data, Cloud & AI aplicados

noviembre 4, 2020
LEER MÁS

Una estrategia analítica eficiente

diciembre 13, 2022
LEER MÁS

Azure Data Studio y Copilot

octubre 11, 2023
LEER MÁS

Introducción a los productos de HashiCorp

agosto 25, 2020
LEER MÁS

Del negocio físico a la explosión del On-Line

abril 7, 2021
LEER MÁS

Publicado en: Blog, Blog, Destacado, interes, Destacado, Practices

Starburst: Construyendo un futuro basado en datos.

mayo 25, 2023 by Bluetab

Starburst: Construyendo un futuro basado en datos.

Lucas Calvo

Cloud Engineer

Introducción

En este nuevo artículo vamos a hablar de uno de nuestros partners: Starburst[1]. Starburst es la versión empresarial de Trino[2] realizando nuevas integraciones, mejoras de rendimiento, una capa de seguridad y restando complejidad a la gestión con una interfaz de usuario muy fácil de usar y que te permite realizar distintas configuraciones.

Para los que no conocéis Trino, es un motor de consulta SQL distribuido open-source creado en 2012 por Facebook bajo el nombre Presto. Está diseñado para consultar grandes conjuntos de datos distribuidos en una o más fuentes de datos heterogéneas. Esto significa que podemos consultar datos que residen en diferentes sistemas de almacenamiento como HDFS, AWS S3, Google Cloud Storage o Azure Blob Storage. Trino también tiene la capacidad de federar diferentes fuentes de datos como MySQL, PostgreSQL, Cassandra, Kafka.

Con las nuevas necesidades que van saliendo de arquitecturas orientadas al Data Mesh[3], plataformas analíticas como Starburst son cada vez más importantes y nos permiten centralizar y federar distintas fuentes de datos para así tener solo un punto de entrada a nuestra información. Con esta mentalidad, podemos hacer que nuestros usuarios accedan a la plataforma de Starburst con distintos roles y distinta granularidad de acceso para que puedan consultar los distintos dominios que poseen las empresas. Además Starburst no solo se queda en la consulta de datos, sino que nos permite conectarnos con herramientas analíticas como puedes ser DBT[4] o Jupyter Notebook[5] o herramientas de reporting como Power BI[6] para sacarle más rendimiento a todos nuestros datos. Pero Starburst no solo se queda en eso, sino que nos puede ayudar en la migraciones de datos hacia el Cloud, ya que fácilmente podemos conectarnos a las fuentes de datos y sacar toda la información para volcarlas en cualquier almacenamiento del Cloud.

Como podéis observar, Starburst es capaz de analizar todos sus datos, dentro y alrededor de tu Data Lake, y se conecta a todo un ecosistema de herramientas. Por eso vamos a realizar una serie de artículos para tratar los puntos más relevantes como son el despliegue y configuración de la plataforma, integración con otras herramientas y gobierno y administración de usuarios. En este primer artículo, nos vamos a centrar en el despliegue de Starburst en Kubernetes, así como la configuración que se tiene que realizar para conectar con los distintos componentes de GCP. Además hemos añadido una capa de monitorización con Prometheus[7] y Grafana[8], donde hemos publicado un dashboard con distintas métricas importantes por si cualquier compañía quiere centralizar las métricas en Grafana. Para todo ello, nos vamos a apoyar de un repositorio que hemos creado con el levantamiento de la infraestructura y la instalación de Starburst.

¿Qué necesitas para entender este artículo?

  • Algunos conceptos sobre Terraform[9].
  • Algunos conceptos de Kubernetes.
  • Algunos conceptos de Helm.
  • Algunos conceptos de Prometheus.
  • Algunos conceptos de Grafana.
  • Un cuenta en GCP.
  • Una licencia de Starburst

Arquitectura

Como se puede observar en el diagrama, estos son los componentes que se van a desplegar para la configuración de Starburst. Como pieza central del despliegue, utilizaremos Google Kubernetes Engine. Este es el servicio administrado de orquestación de contenedores de Google. Utilizaremos Kubernetes ya que nos facilitará la gestión de Starburst y aprovecharemos las ventajas del autoscaling de Kubernetes para ampliar el número de workers de Starburst y escalar en más nodos para poder así tener más recursos de computación si tenemos algún pico de trabajo o de usuarios.

Como configuración inicial de nuestro cluster de GKE, comenzaremos con un único nodepool para facilitar el despliegue. Un nodepool es una agrupación de nodos dentro de un cluster con la misma configuración y especificaciones de tipo de máquina. En nuestro caso, nuestro nodepool se llamará `default-node-pool` y el tipo de instancia utilizada será `e2-standard-16`, que es la recomendada por Starburst, ya que el tipo de carga de trabajo necesita nodos con bastante memoria. Además de la instalación de Starburst, también desplegaremos en el cluster tanto Prometheus como Grafana.

Como hemos explicado anteriormente, Starburst está basado en Trino, que es un motor de consulta distribuido. Los principales componentes de Trino son el Coordinator y los Workers. El Coordinator de Trino es el componente responsable de analizar las sentencias, planificar las consultas y gestionar los nodos Workers de Trino. El Coordinator realiza un seguimiento de la actividad de cada Worker y orquesta la ejecución de una consulta. Los Workers son el componente responsable de ejecutar tareas y procesar datos. Los nodos Workers obtienen datos de los conectores e intercambian datos intermedios entre sí. El Coordinator es responsable de obtener los resultados de los Workers y devolver los resultados finales al cliente.

Como componentes transversales de nuestra arquitectura, también desplegaremos una red con una subnet para realizar el despliegue de nuestro cluster de GKE, así como un bucket en Cloud Storage para realizar pruebas de escritura de datos desde Starburst.

Además, como componente fuera de la arquitectura, tendremos jmeter[10], la herramienta con la que realizaremos pruebas de performance para probar la elasticidad de Starburst y poder probar el autoescalado de nuestro cluster.

Despliegue de la infraestructura

Una vez explicada la arquitectura vamos a proceder a realizar el despliegue de todos los componentes. Para ello, nos vamos a ayudar de Terraform como herramienta de IaC. Como partes importantes de este despliegue, tendremos la parte más de infraestructura tradicional que son las VPC, el cluster de GKE y la parte de Cloud Storage como hemos hablado antes, además de los componentes que desplegamos en Kubernetes de una forma totalmente automatizada que son Grafana y Prometheus.

Vamos a empezar con la explicación de la infraestructura más clásica. Para este despliegue haremos uso de dos módulos que están subidos al github:

  • Módulo de GKE[11].
  • Módulo de VPC[12].

Estos dos módulos están invocados en el `main.tf` del repositorio y hacen uso del provider de Google para el despliegue:


```tf
provider "google" {
  project = var.project_id
  region  = var.region
}

provider "google-beta" {
  project = var.project_id
  region  = var.region
}


module "network" {
  source = "git@github.com:lucasberlang/gcp-network.git?ref=v1.0.0"

  project_id         = var.project_id
  description        = var.description
  enable_nat_gateway = true
  offset             = 1

  intra_subnets = [
    {
      subnet_name           = "private-subnet01"
      subnet_ip_cidr        = "10.0.0.0/24"
      subnet_private_access = false
      subnet_region         = var.region
    }
  ]

  secondary_ranges = {
    private-subnet01 = [
      {
        range_name    = "private-subnet01-01"
        ip_cidr_range = var.ip_range_pods
      },
      {
        range_name    = "private-subnet01-02"
        ip_cidr_range = var.ip_range_services
      },
    ]
  }

  labels = var.labels
}

resource "google_storage_bucket" "gcs_starburst" {
  name          = var.name
  location      = "EU"
  force_destroy = var.force_destroy
}

module "gke-starburst" {
  source = "git@github.com:lucasberlang/gcp-gke.git?ref=v1.1.0"

  project_id              = var.project_id
  name                    = "starburst"
  regional                = true
  region                  = var.region
  network                 = module.network.network_name
  subnetwork              = "go-euw1-bt-stb-private-subnet01-dev"
  ip_range_pods           = "private-subnet01-01"
  ip_range_services       = "private-subnet01-02"
  enable_private_endpoint = false
  enable_private_nodes    = false
  master_ipv4_cidr_block  = "172.16.0.0/28"
  workload_identity       = false
  kubernetes_version      = var.kubernetes_version
  
  gce_persistent_disk_csi_driver = true

  master_authorized_networks = [
    {
      cidr_block   = module.network.intra_subnet_ips.0
      display_name = "VPC"
    },
    {
      cidr_block   = "0.0.0.0/0"
      display_name = "shell"
    }
  ]

  cluster_autoscaling = {
    enabled             = true,
    autoscaling_profile = "BALANCED",
    max_cpu_cores       = 300,
    max_memory_gb       = 940,
    min_cpu_cores       = 24,
    min_memory_gb       = 90,
  }


  node_pools = [
    {
      name         = "default-node-pool"
      machine_type = "e2-standard-16"
      auto_repair  = false
      auto_upgrade = false
    },
  ]
  
  node_labels = {
    "starburstpool" = "default-node-pool"
  }

  istio     = var.istio
  dns_cache = var.dns_cache
  labels    = var.labels
}
```
 

Lo único importante a tener en cuenta, es que vamos a desplegar una red con una única subred y que el cluster de GKE está habilitado con el autoescalado para poder incrementar el número de nodos cuando haya una carga de trabajo. Asimismo, es importante tener en cuenta que se ha añadido una etiqueta a todos los nodos que es `»starburstpool» = «default-node-pool»` para aislar el propio despliegue de Starburst del que más tarde haremos uso. Aparte de estos componentes también desplegamos una Cloud Storage para luego configurar el conector de Hive.

Por otra parte, como hemos comentado, también haremos el despliegue de Grafana y Prometheus. Para ello haremos uso del provider de Helm y de Kubernetes de Terraform. 

El despliegue de estos componentes lo tenemos en el archivo `helm.tf`:

```tf
resource "kubernetes_namespace" "prometheus" {
  metadata {
    name = "prometheus"
  }
}

resource "kubernetes_namespace" "grafana" {
  metadata {
    name = "grafana"
  }
}

resource "helm_release" "grafana" {
  chart      = "grafana"
  name       = "grafana"
  namespace  = kubernetes_namespace.grafana.metadata.0.name
  repository = "https://grafana.github.io/helm-charts"

  values = [
    file("templates/grafana.yaml")
  ]
}

resource "kubernetes_secret" "grafana-secrets" {
  metadata {
    name      = "grafana-credentials"
    namespace = kubernetes_namespace.grafana.metadata.0.name
  }
  data = {
    adminUser     = "admin"
    adminPassword = "admin"
  }
}

resource "helm_release" "prometheus" {
  chart      = "prometheus"
  name       = "prometheus"
  namespace  = kubernetes_namespace.prometheus.metadata.0.name
  repository = "https://prometheus-community.github.io/helm-charts"

  values = [
    file("templates/prometheus.yaml")
  ]
}
```
 

Hay varias cosas que tenemos que tener en cuenta, estas son las configuraciones que hemos añadido en los values de cada chart. 

Primero vamos con los valores de Prometheus que hemos configurado. Hemos añadido una configuración extra para que recoja las métricas de Starburst una vez que se levante. Esto lo hemos hecho en la siguiente parte de la configuración:

```yaml
extraScrapeConfigs: |
  - job_name: starburst-monitor
    scrape_interval: 5s
    static_configs:
      - targets: 
        - 'prometheus-coordinator-starburst-enterprise.default.svc.cluster.local:8081'
        - 'prometheus-worker-starburst-enterprise.default.svc.cluster.local:8081'
    metrics_path: /metrics
    scheme: http
```
 

Lo único a tener en cuenta son los targets que hemos añadido, que básicamente son los servicios tanto del Coordinator como de los Workers de Starburst para que recoja todas las métricas.

En la parte de Grafana hemos añadido tanto la configuración de Prometheus, como un dashboard que hemos creado custom para Starburst. 

La configuración que hemos añadida es la siguiente:

```yaml
datasources:
 datasources.yaml:
   apiVersion: 1
   datasources:
   - name: Prometheus
     type: prometheus
     url: http://prometheus-server.prometheus.svc.cluster.local
     isDefault: true


dashboards:
  default:
    Starburst-cluster:
      gnetId: 18767
      revision: 1
      datasource: Prometheus
```
 

En la carpeta infra del repositorio de Github, podrás encontrar todo el código necesario para realizar dicho despliegue.

Instalación y configuración de Starburst

Una vez que tengamos toda la infraestructura levantada, vamos a proceder a desplegar Starburst en nuestro cluster de GKE. Para ello, vamos a desplegar estos componentes:

  • Postgres Database on Kubernetes
  • Hive Metastore Service
  • Starburst Enterprise

El servicio de Hive Mestastore es necesario para configurar el conector de Hive para así poder acceder o escribir a los datos que se guardan en Google Cloud Storage. Como backend de nuestro servicio de Metastore, vamos a desplegar un base de datos PostgreSQL, para así poder guardar toda la información de la metadata en esta base de datos. Además tendremos que configurar el servicio de Hive para pasarle las credenciales de Google Cloud y que tenga permisos para poder leer y escribir de GCS. Por lo tanto, vamos a proceder primero a declarar algunas variables de entorno que necesitaremos para descargar los charts del repositorio privado de Starburst y algunas variables de configuración más que necesitaremos para realizar el despliegue.

Esta serían las variables que vamos a necesitar en nuestro despliegue:

```bash
export admin_usr=     # Choose an admin user name you will use to login to Starburst & Ranger. Do NOT use 'admin'
export admin_pwd=     # Choose an admin password you will use to login to Starburst & Ranger. MUST be a minimum of 8 characters and contain at least one uppercase, lowercase and numeric value.

export registry_pwd= #Credentials harbor registry
export registry_usr= #Credentials harbor registry
export starburst_license=starburstdata.license #License Starburst
# Zone where the cluster will be deployed. e.g. us-east4-b
export zone="europe-west1"
# Google Cloud Project ID where the cluster is being deployed
export google_cloud_project=
# Google Service account name. The service account is used to access services like GCS and BigQuery, so you should ensure that it has the relevant permissions for these
# Give your cluster a name
export cluster_name=

# These next values are automatically set based on your input values
# We'll automatically get the domain for the zone you are selecting. Comment this out if you don't need DNS
#export google_cloud_dns_zone_name=$(gcloud dns managed-zones describe ${google_cloud_dns_zone:?Zone not set} --project ${google_cloud_project_dns:?Project ID not set} | grep dnsName | awk '{ print $2 }' | sed 's/.$//g')

# This is the public URL to access Starburst
export starburst_url=${cluster_name:?Cluster Name not set}-starburst.${google_cloud_dns_zone_name}
# This is the public URL to access Ranger
export ranger_url=${cluster_name:?Cluster Name not set}-ranger.${google_cloud_dns_zone_name}

# Insights DB details
# These are the defaults if you choose to deploy your postgresDB to the K8s cluster
# You can adjust these to connect to an external DB, but be advised that the nodes in the K8s cluster must have access to the URL
export database_connection_url=jdbc:postgresql://postgresql:5432/insights
export database_username=
export database_password=

# Data Products. Leave the password unset as below, if you are connecting directly to the coordinator on port 8080
export data_products_enabled=true
export data_products_jdbc_url=jdbc:trino://coordinator:8080
export data_products_username=${admin_usr}
export data_products_password=

# Starburst Access Control
export starburst_access_control_enabled=true
export starburst_access_control_authorized_users=${admin_usr}

# These last remaining values are static
export xtra_args_hive="--set objectStorage.gs.cloudKeyFileSecret=service-account-key"
export xtra_args_starburst="--values starburst.catalog.yaml"
export xtra_args_ranger=""
```
 

Una vez definidas nuestras variables de entorno procederemos a crearnos un secreto de Kubernetes para configurar las credenciales con las que Hive se va a conectar a GCS.

```bash
kubectl create secret generic service-account-key --from-file key.json
```
 

Para ello, como paso previo, nos hemos creado una service account con permisos en Cloud Storage y en Bigquery y nos hemos descargado las credenciales de esa service account. También como paso previo, añadiremos los repositorio de Helm con el siguiente comando:

```bash
helm repo add --username ${registry_usr} --password ${registry_pwd} starburstdata https://harbor.starburstdata.net/chartrepo/starburstdata
helm repo add bitnami https://charts.bitnami.com/bitnami
```
 

Una vez que tenemos la configuración previa hecha, vamos a proceder a desplegar el servicio de PostgreSQL primero, y posteriormente, el Hive Metastore. Para ello haremos uso de Helm. Para el despliegue de PostgreSQL usaremos el siguiente comando:

```bash
helm upgrade postgres bitnami/postgresql --install --values postgres.yaml \
    --version 12.1.6 \
    --set primary.nodeSelector.starburstpool=default-node-pool \
    --set readReplicas.nodeSelector.starburstpool=default-node-pool
```
 

Hay varios factores a tener en cuenta en el comando anterior. El primero es que el despliegue de PostgreSQL lo haremos en los nodos que tengan el tag `starburstpool=default-node-pool`, que es nuestro worker pool por defecto. Usaremos la versión 12.1.6 de PostgreSQL y la configuración que hemos añadido en postgres es la siguiente:

```yaml
fullnameOverride: postgresql

global:
  postgresql:
    auth:
      database: postgres
      username: postgres
      postgresPassword: ****
  storageClass: "standard"
primary:
  initdb:
    scripts:
      init.sql: |
        create database hive;
        create database ranger;
        create database insights;
        create database datacache;

service:
  type: ClusterIP
``` 

Esta información se encuentra en el archivo `postgres.yaml` y nos configurará el usuario y contraseña de PostgreSQL, y nos creará 4 bases de datos que usa internamente Starburst como backend. En nuestro caso, como podéis observar, hemos configurado el servicio de backend en el mismo cluster que la configuración de Starburst, pero esto se puede configurar fuera del cluster de Kubernetes para entornos productivos. Básicamente podríamos tener un servicio gestionado como es Cloud Sql para así evitar problemas en producción.

Ahora vamos a proceder con el despliegue del servicio de Hive Metastore, esto lo haremos con el siguiente comando:

```bash
helm upgrade hive starburstdata/starburst-hive --install --values hive.yaml \
    --set registryCredentials.username=${registry_usr:?Value not set} \
    --set registryCredentials.password=${registry_pwd:?Value not set} \
    --set nodeSelector.starburstpool=default-node-pool  \
    --set objectStorage.gs.cloudKeyFileSecret=service-account-key
``` 

Aquí tenemos que tener en cuenta varias cosas importantes, la primera es que como en el servicio de PostgreSQL el despliegue se va a realizar en los nodos con el tag `starburstpool=default-node-pool`. El segundo punto importante es que hemos realizado la configuración de las credenciales de Google para que funcione el conector de hive, esto lo hemos realizado con el siguiente comando:

`--set objectStorage.gs.cloudKeyFileSecret=service-account-key` 

Con esta acción,  montamos el fichero de credenciales como un archivo en el despliegue de Hive para que tenga visibilidad en las credenciales. Los valores extras que hemos añadido a la configuración de hive se encuentran en el archivo `hive.yaml` y son los siguientes:

```yaml
database:
  external:
    driver: org.postgresql.Driver
    jdbcUrl: jdbc:postgresql://postgresql:5432/hive
    user: #user postgres
    password: #password postgres
  type: external

expose:
  type: clusterIp

image:
  repository: harbor.starburstdata.net/starburstdata/hive

registryCredentials:
  enabled: true
  registry: harbor.starburstdata.net/starburstdata
```
 

Una vez que tenemos desplegado tanto el servicio de Postgres como el de Hive Metastore, podemos proceder a desplegar Starburst. Primero necesitaremos realizar una serie de pasos previos. El primero será crearnos un secreto de Kubernetes con la licencia de Starburst, el segundo será crearnos un secreto con las variables de entornos que hemos definido antes, esto lo haremos con un pequeño script para quitar complejidad y que nos coja las variables que ya hemos definido. 

Con el siguiente comando procederemos a realizar los pasos anteriores:

```bash
kubectl create secret generic starburst --from-file ${starburst_license}
chmod 755 load_secrets.sh && . ./load_secrets.sh
kubectl apply -f secrets.yaml
```
 

Una vez que tenemos las configuraciones previas vamos a proceder a desplegar Starburst con el siguiente comando:

```bash
helm upgrade starburst-enterprise starburstdata/starburst-enterprise --install --values starburst.yaml \
    --set sharedSecret="$(openssl rand 64 | base64)" \
    --set coordinator.resources.requests.memory=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.memory}' | awk -F "Ki" '{ print $1 }')*10/100 ))Ki) \
    --set coordinator.resources.requests.cpu=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.cpu}' | awk -F "m" '{ print $1 }')*10/100 ))m) \
    --set coordinator.resources.limits.memory=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.memory}' | awk -F "Ki" '{ print $1 }')*10/100 ))Ki) \
    --set coordinator.resources.limits.cpu=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.cpu}' | awk -F "m" '{ print $1 }')*10/100 ))m) \
    --set worker.resources.requests.memory=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.memory}' | awk -F "Ki" '{ print $1 }') - 10500000 ))Ki) \
    --set worker.resources.requests.cpu=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.cpu}' | awk -F "m" '{ print $1 }') - 3500 ))m) \
    --set worker.resources.limits.memory=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.memory}' | awk -F "Ki" '{ print $1 }') - 10500000 ))Ki) \
    --set worker.resources.limits.cpu=$(echo $(( $(kubectl get nodes --selector='starburstpool=default-node-pool' -o jsonpath='{.items[0].status.allocatable.cpu}' | awk -F "m" '{ print $1 }') - 3500 ))m) \
    --set coordinator.nodeSelector.starburstpool=default-node-pool 
```
 

Aquí como podéis observar, hay varias cosas a tener en cuenta. La primera es que todos los componentes de Starburst que se despliegan lo hacen en los nodos con el tag `starburstpool=default-node-pool`. Esto simplemente lo hemos hecho para quitar complejidad a la demo.  En entornos productivos, una buena práctica sería tener un nodepool para el Coordinator y otro nodepool para los Workers de Starburst.

Otra cosa a tener en cuenta es la configuración de la memoria y cpu que se hace tanto en los Workers como en el Coordinator. Como buenas prácticas, Starburst recomienda que haya un pod worker por cada nodo que se despliega en nuestro cluster de Kubernetes. Para ello lo que hemos hecho es ajustar la memoria y cpu de nuestros pods al tamaño de máquina que tenemos. Por último están los valores de configuración que hemos utilizado en el despliegue de Starburst, estos se pueden encontrar en el archivo `starburst.yaml` y son los siguientes:

```yaml
catalogs:
  hive: |
    connector.name=hive
    hive.security=starburst
    hive.metastore.uri=thrift://hive:9083
    hive.gcs.json-key-file-path=/gcs-keyfile/key.json
    hive.gcs.use-access-token=false
  postgres: |
    connector.name=postgresql
    connection-url=jdbc:postgresql://postgresql:5432/insights
    connection-user=******
    connection-password=******
  bigquery: |
      connector.name=bigquery
      bigquery.project-id=******
      bigquery.credentials-file=/gcs-keyfile/key.json
prometheus:
  enabled: true
  agent:
    version: "0.16.1"
    port: 8081
    config: "/etc/starburst/telemetry/prometheus.yaml"
  rules:
    - pattern: trino.execution<name=QueryManager><>(running_queries|queued_queries)
      name: $1
      attrNameSnakeCase: true
      type: GAUGE
    - pattern: 'trino.execution<name=QueryManager><>FailedQueries\.TotalCount'
      name: 'starburst_failed_queries'
      type: COUNTER
    - pattern: 'trino.execution<name=QueryManager><>(running_queries)'
      name: 'starburst_running_queries'
    - pattern: 'trino.execution<name=QueryManager><>StartedQueries\.FiveMinute\.Count'
      name: 'starburst_started_queries'
    - pattern: 'trino.execution<name=SqlTaskManager><>InputPositions\.FiveMinute\.Count'
      name: 'starburst_input_rows'
    - pattern: 'trino.execution<name=SqlTaskManager><>InputDataSize\.FiveMinute\.Count'
      name: 'starburst_input_data_bytes'
    - pattern: 'trino.execution<name=QueryManager><>UserErrorFailures\.FiveMinute\.Count'
      name: 'starburst_failed_queries_user'
    - pattern: 'trino.execution<name=QueryManager><>ExecutionTime\.FiveMinutes\.P50'
      name: 'starburst_latency_p50'
    - pattern: 'trino.execution<name=QueryManager><>WallInputBytesRate\.FiveMinutes\.P90'
      name: 'starburst_latency_p90'
    - pattern: 'trino.failuredetector<name=HeartbeatFailureDetector><>ActiveCount'
      name: 'starburst_active_node'
    - pattern: 'trino.memory<type=ClusterMemoryPool, name=general><>FreeDistributedBytes'
      name: 'starburst_free_memory_pool'
    - pattern: 'trino.memory<name=ClusterMemoryManager><>QueriesKilledDueToOutOfMemory'
      name: 'starburst_queries_killed_due_to_out_of_memory'
    - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
      name: 'starburst_heap_size_usage'
    - pattern: 'java.lang<type=Threading><>ThreadCount'
      name: 'starburst_thread_count'
coordinator:
  envFrom:
  - secretRef:
      name: environment-vars
  additionalProperties: |
    starburst.data-product.enabled=${ENV:data_products_enabled}
    data-product.starburst-jdbc-url=${ENV:data_products_jdbc_url}
    data-product.starburst-user=${ENV:data_products_username}
    data-product.starburst-password=
    query.max-memory=1PB
    starburst.access-control.enabled=${ENV:starburst_access_control_enabled}
    starburst.access-control.authorized-users=${ENV:starburst_access_control_authorized_users}
  etcFiles:
    properties:
      config.properties: |
        coordinator=true
        node-scheduler.include-coordinator=false
        http-server.http.port=8080
        discovery-server.enabled=true
        discovery.uri=http://localhost:8080
        usage-metrics.cluster-usage-resource.enabled=true
        http-server.authentication.allow-insecure-over-http=true
        web-ui.enabled=true
        http-server.process-forwarded=true
        insights.persistence-enabled=true
        insights.metrics-persistence-enabled=true
        insights.jdbc.url=${ENV:database_connection_url}
        insights.jdbc.user=${ENV:database_username}
        insights.jdbc.password=${ENV:database_password}
      password-authenticator.properties: |
        password-authenticator.name=file
  nodeSelector:
    starburstpool: default-node-pool
  resources:
    limits:
      cpu: 2
      memory: 12Gi
    requests:
      cpu: 2
      memory: 12Gi

expose:
  type: clusterIp
  ingress:
    serviceName: starburst
    servicePort: 8080
    host: 
    path: "/"
    pathType: Prefix
    tls:
      enabled: true
      secretName: tls-secret-starburst
    annotations:
      kubernetes.io/ingress.class: nginx
      cert-manager.io/cluster-issuer: letsencrypt

registryCredentials:
  enabled: true
  password: ******
  registry: harbor.starburstdata.net/starburstdata
  username: ******

starburstPlatformLicense: starburst

userDatabase:
  enabled: true
  users:
  - password: ******
    username: ******

worker:
  envFrom:
  - secretRef:
      name: environment-vars
  autoscaling:
    enabled: true
    maxReplicas: 10
    minReplicas: 3
    targetCPUUtilizationPercentage: 40
  deploymentTerminationGracePeriodSeconds: 30
  nodeSelector:
    starburstpool: default-node-pool
  resources:
    limits:
      cpu: 8
      memory: 40Gi
    requests:
      cpu: 8
      memory: 40Gi
  starburstWorkerShutdownGracePeriodSeconds: 120
  tolerations:
    - key: "kubernetes.azure.com/scalesetpriority"
      operator: "Exists"
      effect: "NoSchedule"

additionalVolumes:
  - path: /gcs-keyfile/key.json
    subPath: key.json
    volume:
      configMap:
        name: "sa-key"
```
 

En esta configuración hay varios valores a tener en cuenta, como son catalogs, prometheus, worker y additionalVolumes.

Vamos a empezar explicando la parte de catalogs. Para los que no lo sepan, un catálogo en Starburst es la configuración que permite acceder a unas fuentes de datos determinadas. Cada clúster de Starburst puede tener configurados múltiples catálogos y, por tanto, permitir el acceso a diversas fuentes de datos. En nuestro caso hemos definido el catálogo de Hive, PostgreSQL y Bigquery para poder acceder a dichas fuentes de datos:

```yaml
catalogs:
  hive: |
    connector.name=hive
    hive.security=starburst
    hive.metastore.uri=thrift://hive:9083
    hive.gcs.json-key-file-path=/gcs-keyfile/key.json
    hive.gcs.use-access-token=false
  postgres: |
    connector.name=postgresql
    connection-url=jdbc:postgresql://postgresql:5432/insights
    connection-user=******
    connection-password=******
  bigquery: |
      connector.name=bigquery
      bigquery.project-id=******
      bigquery.credentials-file=/gcs-keyfile/key.json
```
 

La segunda configuración a tener en cuenta es la de Prometheus, esto lo realizamos para exponer ciertas métricas a Prometheus y poder sacar información relevante en un dashboard de Grafana. Para ello tenemos la siguiente configuración:

```yaml
prometheus:
  enabled: true
  agent:
    version: "0.16.1"
    port: 8081
    config: "/etc/starburst/telemetry/prometheus.yaml"
  rules:
    - pattern: trino.execution<name=QueryManager><>(running_queries|queued_queries)
      name: $1
      attrNameSnakeCase: true
      type: GAUGE
    - pattern: 'trino.execution<name=QueryManager><>FailedQueries\.TotalCount'
      name: 'starburst_failed_queries'
      type: COUNTER
    - pattern: 'trino.execution<name=QueryManager><>(running_queries)'
      name: 'starburst_running_queries'
    - pattern: 'trino.execution<name=QueryManager><>StartedQueries\.FiveMinute\.Count'
      name: 'starburst_started_queries'
    - pattern: 'trino.execution<name=SqlTaskManager><>InputPositions\.FiveMinute\.Count'
      name: 'starburst_input_rows'
    - pattern: 'trino.execution<name=SqlTaskManager><>InputDataSize\.FiveMinute\.Count'
      name: 'starburst_input_data_bytes'
    - pattern: 'trino.execution<name=QueryManager><>UserErrorFailures\.FiveMinute\.Count'
      name: 'starburst_failed_queries_user'
    - pattern: 'trino.execution<name=QueryManager><>ExecutionTime\.FiveMinutes\.P50'
      name: 'starburst_latency_p50'
    - pattern: 'trino.execution<name=QueryManager><>WallInputBytesRate\.FiveMinutes\.P90'
      name: 'starburst_latency_p90'
    - pattern: 'trino.failuredetector<name=HeartbeatFailureDetector><>ActiveCount'
      name: 'starburst_active_node'
    - pattern: 'trino.memory<type=ClusterMemoryPool, name=general><>FreeDistributedBytes'
      name: 'starburst_free_memory_pool'
    - pattern: 'trino.memory<name=ClusterMemoryManager><>QueriesKilledDueToOutOfMemory'
      name: 'starburst_queries_killed_due_to_out_of_memory'
    - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
      name: 'starburst_heap_size_usage'
    - pattern: 'java.lang<type=Threading><>ThreadCount'
      name: 'starburst_thread_count'
```
 

En la configuración de los workers, vamos a activar el autoescalado de estos pods. Para ello vamos a realizar una configuración para que haya un mínimo de 3 pods workers que se traducirán en 3 nodos en nuestro cluster de GKE y un máximo de 10 pods. Para el autoescalado vamos a usar la métrica de consumo de CPU. 

Los valores son los siguientes:

```yaml
worker:
  envFrom:
  - secretRef:
      name: environment-vars
  autoscaling:
    enabled: true
    maxReplicas: 10
    minReplicas: 3
    targetCPUUtilizationPercentage: 40
```
 

Por último, añadiremos un volumen adicional a nuestro despliegue para poder montar las credenciales de Google cloud tanto en el coordinator como en los workers.

Esto lo haremos de la siguiente forma:

```yaml
additionalVolumes:
  - path: /gcs-keyfile/key.json
    subPath: key.json
    volume:
      configMap:
        name: "sa-key"
```
 

Con todos estos pasos, tendríamos nuestro cluster de Starburst ya operativo.

Consultas en GCP y autoescalado de Starburst

Una vez realizado el levantamiento del cluster de Starburst, vamos a realizar algunas consultas para probar su rendimiento y funcionamiento. Para ello vamos a realizar consultas de lectura en el esquema de TPCH[13] y después vamos a escribir la salida de estas consultas en el bucket de Google que hemos creado en los pasos de despliegue. 

Las consultas que vamos a ejecutar se encuentran en la carpeta de queries en los archivos `tpch.sql` y `gcs_storage.sql`.

Para lanzar las consultas será tan sencillo como irnos al apartado de consultas de la interfaz web y ejecutar las primeras consultas del archivo `tpch.sql`:

```sql
 CREATE SCHEMA hive.logistic WITH (location = 'gs://starburst-bluetab-test/logistic');

CREATE VIEW "hive"."logistic"."shipping_priority" SECURITY DEFINER AS
SELECT
  l.orderkey
, SUM((l.extendedprice * (1 - l.discount))) revenue
, o.orderdate
, o.shippriority
FROM
  tpch.tiny.customer c
, tpch.tiny.orders o
, tpch.tiny.lineitem l
WHERE ((c.mktsegment = 'BUILDING') AND (c.custkey = o.custkey) AND (l.orderkey = o.orderkey))
GROUP BY l.orderkey, o.orderdate, o.shippriority
ORDER BY revenue DESC, o.orderdate ASC;


CREATE VIEW "hive"."logistic"."minimum_cost_supplier" SECURITY DEFINER AS
SELECT
  s.acctbal
, s.name SupplierName
, n.name Nation
, p.partkey
, p.mfgr
, s.address
, s.phone
, s.comment
FROM
  tpch.tiny.part p
, tpch.tiny.supplier s
, tpch.tiny.partsupp ps
, tpch.tiny.nation n
, tpch.tiny.region r
WHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (p.size = 15) AND (p.type LIKE '%BRASS') AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE') AND (ps.supplycost = (SELECT MIN(ps.supplycost)
FROM
  tpch.tiny.partsupp ps
, tpch.tiny.supplier s
, tpch.tiny.nation n
, tpch.tiny.region r
WHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE'))
)))
ORDER BY s.acctbal DESC, n.name ASC, s.name ASC, p.partkey ASC;



select
  cst.name as CustomerName,
  cst.address,
  cst.phone,
  cst.nationkey,
  cst.acctbal as BookedOrders,
  cst.mktsegment,
  nat.name as Nation,
  reg.name as Region
from tpch.sf1.customer as cst
join tpch.sf1.nation as nat on nat.nationkey = cst.nationkey
join tpch.sf1.region as reg on reg.regionkey = nat.regionkey
where reg.regionkey = 1;

select
  nat.name as Nation,
  avg(cst.acctbal) as average_booking
from tpch.sf100.customer as cst
join tpch.sf100.nation as nat on nat.nationkey = cst.nationkey
join tpch.sf100.region as reg on reg.regionkey = nat.regionkey
where reg.regionkey = 1
group by nat.name;
```
 

En estas pruebas crearemos una serie de vistas y haremos unos selects con varios cruces sobre las tablas de customer(15000000 rows), nation(25 rows) y region(5 rows) del esquema sf100 para comprobar que todo funciona correctamente y ver que tenemos nuestra plataforma operativa. Una vez comprobado que todo es correcto, probaremos a escribir algunos resultados en el bucket que hemos creado.

Para ello lanzaremos las consultas que se encuentran en el archivo `gcs_storage.sql`:

{"type":"elementor","siteurl":"https://bluetab.net/es/wp-json/","elements":[{"id":"1a82503","elType":"widget","isInner":false,"isLocked":false,"settings":{"code_language":"python","code_block":"```sql\n CREATE SCHEMA hive.logistic WITH (location = 'gs://starburst-bluetab-test/logistic');\n\nCREATE VIEW \"hive\".\"logistic\".\"shipping_priority\" SECURITY DEFINER AS\nSELECT\n  l.orderkey\n, SUM((l.extendedprice * (1 - l.discount))) revenue\n, o.orderdate\n, o.shippriority\nFROM\n  tpch.tiny.customer c\n, tpch.tiny.orders o\n, tpch.tiny.lineitem l\nWHERE ((c.mktsegment = 'BUILDING') AND (c.custkey = o.custkey) AND (l.orderkey = o.orderkey))\nGROUP BY l.orderkey, o.orderdate, o.shippriority\nORDER BY revenue DESC, o.orderdate ASC;\n\n\nCREATE VIEW \"hive\".\"logistic\".\"minimum_cost_supplier\" SECURITY DEFINER AS\nSELECT\n  s.acctbal\n, s.name SupplierName\n, n.name Nation\n, p.partkey\n, p.mfgr\n, s.address\n, s.phone\n, s.comment\nFROM\n  tpch.tiny.part p\n, tpch.tiny.supplier s\n, tpch.tiny.partsupp ps\n, tpch.tiny.nation n\n, tpch.tiny.region r\nWHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (p.size = 15) AND (p.type LIKE '%BRASS') AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE') AND (ps.supplycost = (SELECT MIN(ps.supplycost)\nFROM\n  tpch.tiny.partsupp ps\n, tpch.tiny.supplier s\n, tpch.tiny.nation n\n, tpch.tiny.region r\nWHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE'))\n)))\nORDER BY s.acctbal DESC, n.name ASC, s.name ASC, p.partkey ASC;\n\n\n\nselect\n  cst.name as CustomerName,\n  cst.address,\n  cst.phone,\n  cst.nationkey,\n  cst.acctbal as BookedOrders,\n  cst.mktsegment,\n  nat.name as Nation,\n  reg.name as Region\nfrom tpch.sf1.customer as cst\njoin tpch.sf1.nation as nat on nat.nationkey = cst.nationkey\njoin tpch.sf1.region as reg on reg.regionkey = nat.regionkey\nwhere reg.regionkey = 1;\n\nselect\n  nat.name as Nation,\n  avg(cst.acctbal) as average_booking\nfrom tpch.sf100.customer as cst\njoin tpch.sf100.nation as nat on nat.nationkey = cst.nationkey\njoin tpch.sf100.region as reg on reg.regionkey = nat.regionkey\nwhere reg.regionkey = 1\ngroup by nat.name;\n```\n","_title":"","_margin":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_margin_tablet":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_margin_mobile":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_padding":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_padding_tablet":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_padding_mobile":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_element_width":"","_element_width_tablet":"","_element_width_mobile":"","_element_custom_width":{"unit":"%","size":"","sizes":[]},"_element_custom_width_tablet":{"unit":"px","size":"","sizes":[]},"_element_custom_width_mobile":{"unit":"px","size":"","sizes":[]},"_element_vertical_align":"","_element_vertical_align_tablet":"","_element_vertical_align_mobile":"","_position":"","_offset_orientation_h":"start","_offset_x":{"unit":"px","size":"0","sizes":[]},"_offset_x_tablet":{"unit":"px","size":"","sizes":[]},"_offset_x_mobile":{"unit":"px","size":"","sizes":[]},"_offset_x_end":{"unit":"px","size":"0","sizes":[]},"_offset_x_end_tablet":{"unit":"px","size":"","sizes":[]},"_offset_x_end_mobile":{"unit":"px","size":"","sizes":[]},"_offset_orientation_v":"start","_offset_y":{"unit":"px","size":"0","sizes":[]},"_offset_y_tablet":{"unit":"px","size":"","sizes":[]},"_offset_y_mobile":{"unit":"px","size":"","sizes":[]},"_offset_y_end":{"unit":"px","size":"0","sizes":[]},"_offset_y_end_tablet":{"unit":"px","size":"","sizes":[]},"_offset_y_end_mobile":{"unit":"px","size":"","sizes":[]},"_z_index":"","_z_index_tablet":"","_z_index_mobile":"","_element_id":"","_css_classes":"","motion_fx_motion_fx_scrolling":"","motion_fx_translateY_effect":"","motion_fx_translateY_direction":"","motion_fx_translateY_speed":{"unit":"px","size":4,"sizes":[]},"motion_fx_translateY_affectedRange":{"unit":"%","size":"","sizes":{"start":0,"end":100}},"motion_fx_translateX_effect":"","motion_fx_translateX_direction":"","motion_fx_translateX_speed":{"unit":"px","size":4,"sizes":[]},"motion_fx_translateX_affectedRange":{"unit":"%","size":"","sizes":{"start":0,"end":100}},"motion_fx_opacity_effect":"","motion_fx_opacity_direction":"out-in","motion_fx_opacity_level":{"unit":"px","size":10,"sizes":[]},"motion_fx_opacity_range":{"unit":"%","size":"","sizes":{"start":20,"end":80}},"motion_fx_blur_effect":"","motion_fx_blur_direction":"out-in","motion_fx_blur_level":{"unit":"px","size":7,"sizes":[]},"motion_fx_blur_range":{"unit":"%","size":"","sizes":{"start":20,"end":80}},"motion_fx_rotateZ_effect":"","motion_fx_rotateZ_direction":"","motion_fx_rotateZ_speed":{"unit":"px","size":1,"sizes":[]},"motion_fx_rotateZ_affectedRange":{"unit":"%","size":"","sizes":{"start":0,"end":100}},"motion_fx_scale_effect":"","motion_fx_scale_direction":"out-in","motion_fx_scale_speed":{"unit":"px","size":4,"sizes":[]},"motion_fx_scale_range":{"unit":"%","size":"","sizes":{"start":20,"end":80}},"motion_fx_transform_origin_x":"center","motion_fx_transform_origin_y":"center","motion_fx_devices":["desktop","tablet","mobile"],"motion_fx_range":"","motion_fx_motion_fx_mouse":"","motion_fx_mouseTrack_effect":"","motion_fx_mouseTrack_direction":"","motion_fx_mouseTrack_speed":{"unit":"px","size":1,"sizes":[]},"motion_fx_tilt_effect":"","motion_fx_tilt_direction":"","motion_fx_tilt_speed":{"unit":"px","size":4,"sizes":[]},"sticky":"","sticky_on":["desktop","tablet","mobile"],"sticky_offset":0,"sticky_offset_tablet":"","sticky_offset_mobile":"","sticky_effects_offset":0,"sticky_effects_offset_tablet":"","sticky_effects_offset_mobile":"","sticky_parent":"","_animation":"","_animation_tablet":"","_animation_mobile":"","animation_duration":"","_animation_delay":"","_transform_rotate_popover":"","_transform_rotateZ_effect":{"unit":"px","size":"","sizes":[]},"_transform_rotateZ_effect_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_rotateZ_effect_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_rotate_3d":"","_transform_rotateX_effect":{"unit":"px","size":"","sizes":[]},"_transform_rotateX_effect_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_rotateX_effect_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_rotateY_effect":{"unit":"px","size":"","sizes":[]},"_transform_rotateY_effect_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_rotateY_effect_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_perspective_effect":{"unit":"px","size":"","sizes":[]},"_transform_perspective_effect_tablet":{"unit":"px","size":"","sizes":[]},"_transform_perspective_effect_mobile":{"unit":"px","size":"","sizes":[]},"_transform_translate_popover":"","_transform_translateX_effect":{"unit":"px","size":"","sizes":[]},"_transform_translateX_effect_tablet":{"unit":"px","size":"","sizes":[]},"_transform_translateX_effect_mobile":{"unit":"px","size":"","sizes":[]},"_transform_translateY_effect":{"unit":"px","size":"","sizes":[]},"_transform_translateY_effect_tablet":{"unit":"px","size":"","sizes":[]},"_transform_translateY_effect_mobile":{"unit":"px","size":"","sizes":[]},"_transform_scale_popover":"","_transform_keep_proportions":"yes","_transform_scale_effect":{"unit":"px","size":"","sizes":[]},"_transform_scale_effect_tablet":{"unit":"px","size":"","sizes":[]},"_transform_scale_effect_mobile":{"unit":"px","size":"","sizes":[]},"_transform_scaleX_effect":{"unit":"px","size":"","sizes":[]},"_transform_scaleX_effect_tablet":{"unit":"px","size":"","sizes":[]},"_transform_scaleX_effect_mobile":{"unit":"px","size":"","sizes":[]},"_transform_scaleY_effect":{"unit":"px","size":"","sizes":[]},"_transform_scaleY_effect_tablet":{"unit":"px","size":"","sizes":[]},"_transform_scaleY_effect_mobile":{"unit":"px","size":"","sizes":[]},"_transform_skew_popover":"","_transform_skewX_effect":{"unit":"px","size":"","sizes":[]},"_transform_skewX_effect_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_skewX_effect_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_skewY_effect":{"unit":"px","size":"","sizes":[]},"_transform_skewY_effect_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_skewY_effect_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_flipX_effect":"","_transform_flipY_effect":"","_transform_rotate_popover_hover":"","_transform_rotateZ_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_rotateZ_effect_hover_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_rotateZ_effect_hover_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_rotate_3d_hover":"","_transform_rotateX_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_rotateX_effect_hover_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_rotateX_effect_hover_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_rotateY_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_rotateY_effect_hover_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_rotateY_effect_hover_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_perspective_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_perspective_effect_hover_tablet":{"unit":"px","size":"","sizes":[]},"_transform_perspective_effect_hover_mobile":{"unit":"px","size":"","sizes":[]},"_transform_translate_popover_hover":"","_transform_translateX_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_translateX_effect_hover_tablet":{"unit":"px","size":"","sizes":[]},"_transform_translateX_effect_hover_mobile":{"unit":"px","size":"","sizes":[]},"_transform_translateY_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_translateY_effect_hover_tablet":{"unit":"px","size":"","sizes":[]},"_transform_translateY_effect_hover_mobile":{"unit":"px","size":"","sizes":[]},"_transform_scale_popover_hover":"","_transform_keep_proportions_hover":"yes","_transform_scale_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_scale_effect_hover_tablet":{"unit":"px","size":"","sizes":[]},"_transform_scale_effect_hover_mobile":{"unit":"px","size":"","sizes":[]},"_transform_scaleX_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_scaleX_effect_hover_tablet":{"unit":"px","size":"","sizes":[]},"_transform_scaleX_effect_hover_mobile":{"unit":"px","size":"","sizes":[]},"_transform_scaleY_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_scaleY_effect_hover_tablet":{"unit":"px","size":"","sizes":[]},"_transform_scaleY_effect_hover_mobile":{"unit":"px","size":"","sizes":[]},"_transform_skew_popover_hover":"","_transform_skewX_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_skewX_effect_hover_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_skewX_effect_hover_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_skewY_effect_hover":{"unit":"px","size":"","sizes":[]},"_transform_skewY_effect_hover_tablet":{"unit":"deg","size":"","sizes":[]},"_transform_skewY_effect_hover_mobile":{"unit":"deg","size":"","sizes":[]},"_transform_flipX_effect_hover":"","_transform_flipY_effect_hover":"","_transform_transition_hover":{"unit":"px","size":"","sizes":[]},"motion_fx_transform_x_anchor_point":"","motion_fx_transform_x_anchor_point_tablet":"","motion_fx_transform_x_anchor_point_mobile":"","motion_fx_transform_y_anchor_point":"","motion_fx_transform_y_anchor_point_tablet":"","motion_fx_transform_y_anchor_point_mobile":"","_background_background":"","_background_color":"","_background_color_stop":{"unit":"%","size":0,"sizes":[]},"_background_color_b":"#f2295b","_background_color_b_stop":{"unit":"%","size":100,"sizes":[]},"_background_gradient_type":"linear","_background_gradient_angle":{"unit":"deg","size":180,"sizes":[]},"_background_gradient_position":"center center","_background_image":{"url":"","id":"","size":""},"_background_image_tablet":{"url":"","id":"","size":""},"_background_image_mobile":{"url":"","id":"","size":""},"_background_position":"","_background_position_tablet":"","_background_position_mobile":"","_background_xpos":{"unit":"px","size":0,"sizes":[]},"_background_xpos_tablet":{"unit":"px","size":0,"sizes":[]},"_background_xpos_mobile":{"unit":"px","size":0,"sizes":[]},"_background_ypos":{"unit":"px","size":0,"sizes":[]},"_background_ypos_tablet":{"unit":"px","size":0,"sizes":[]},"_background_ypos_mobile":{"unit":"px","size":0,"sizes":[]},"_background_attachment":"","_background_repeat":"","_background_repeat_tablet":"","_background_repeat_mobile":"","_background_size":"","_background_size_tablet":"","_background_size_mobile":"","_background_bg_width":{"unit":"%","size":100,"sizes":[]},"_background_bg_width_tablet":{"unit":"px","size":"","sizes":[]},"_background_bg_width_mobile":{"unit":"px","size":"","sizes":[]},"_background_video_link":"","_background_video_start":"","_background_video_end":"","_background_play_once":"","_background_play_on_mobile":"","_background_privacy_mode":"","_background_video_fallback":{"url":"","id":"","size":""},"_background_slideshow_gallery":[],"_background_slideshow_loop":"yes","_background_slideshow_slide_duration":5000,"_background_slideshow_slide_transition":"fade","_background_slideshow_transition_duration":500,"_background_slideshow_background_size":"","_background_slideshow_background_size_tablet":"","_background_slideshow_background_size_mobile":"","_background_slideshow_background_position":"","_background_slideshow_background_position_tablet":"","_background_slideshow_background_position_mobile":"","_background_slideshow_lazyload":"","_background_slideshow_ken_burns":"","_background_slideshow_ken_burns_zoom_direction":"in","_background_hover_background":"","_background_hover_color":"","_background_hover_color_stop":{"unit":"%","size":0,"sizes":[]},"_background_hover_color_b":"#f2295b","_background_hover_color_b_stop":{"unit":"%","size":100,"sizes":[]},"_background_hover_gradient_type":"linear","_background_hover_gradient_angle":{"unit":"deg","size":180,"sizes":[]},"_background_hover_gradient_position":"center center","_background_hover_image":{"url":"","id":"","size":""},"_background_hover_image_tablet":{"url":"","id":"","size":""},"_background_hover_image_mobile":{"url":"","id":"","size":""},"_background_hover_position":"","_background_hover_position_tablet":"","_background_hover_position_mobile":"","_background_hover_xpos":{"unit":"px","size":0,"sizes":[]},"_background_hover_xpos_tablet":{"unit":"px","size":0,"sizes":[]},"_background_hover_xpos_mobile":{"unit":"px","size":0,"sizes":[]},"_background_hover_ypos":{"unit":"px","size":0,"sizes":[]},"_background_hover_ypos_tablet":{"unit":"px","size":0,"sizes":[]},"_background_hover_ypos_mobile":{"unit":"px","size":0,"sizes":[]},"_background_hover_attachment":"","_background_hover_repeat":"","_background_hover_repeat_tablet":"","_background_hover_repeat_mobile":"","_background_hover_size":"","_background_hover_size_tablet":"","_background_hover_size_mobile":"","_background_hover_bg_width":{"unit":"%","size":100,"sizes":[]},"_background_hover_bg_width_tablet":{"unit":"px","size":"","sizes":[]},"_background_hover_bg_width_mobile":{"unit":"px","size":"","sizes":[]},"_background_hover_video_link":"","_background_hover_video_start":"","_background_hover_video_end":"","_background_hover_play_once":"","_background_hover_play_on_mobile":"","_background_hover_privacy_mode":"","_background_hover_video_fallback":{"url":"","id":"","size":""},"_background_hover_slideshow_gallery":[],"_background_hover_slideshow_loop":"yes","_background_hover_slideshow_slide_duration":5000,"_background_hover_slideshow_slide_transition":"fade","_background_hover_slideshow_transition_duration":500,"_background_hover_slideshow_background_size":"","_background_hover_slideshow_background_size_tablet":"","_background_hover_slideshow_background_size_mobile":"","_background_hover_slideshow_background_position":"","_background_hover_slideshow_background_position_tablet":"","_background_hover_slideshow_background_position_mobile":"","_background_hover_slideshow_lazyload":"","_background_hover_slideshow_ken_burns":"","_background_hover_slideshow_ken_burns_zoom_direction":"in","_background_hover_transition":{"unit":"px","size":"","sizes":[]},"_border_border":"","_border_width":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_width_tablet":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_width_mobile":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_color":"","_border_radius":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_radius_tablet":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_radius_mobile":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_box_shadow_box_shadow_type":"","_box_shadow_box_shadow":{"horizontal":0,"vertical":0,"blur":10,"spread":0,"color":"rgba(0,0,0,0.5)"},"_box_shadow_box_shadow_position":" ","_border_hover_border":"","_border_hover_width":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_hover_width_tablet":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_hover_width_mobile":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_hover_color":"","_border_radius_hover":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_radius_hover_tablet":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_border_radius_hover_mobile":{"unit":"px","top":"","right":"","bottom":"","left":"","isLinked":true},"_box_shadow_hover_box_shadow_type":"","_box_shadow_hover_box_shadow":{"horizontal":0,"vertical":0,"blur":10,"spread":0,"color":"rgba(0,0,0,0.5)"},"_box_shadow_hover_box_shadow_position":" ","_border_hover_transition":{"unit":"px","size":"","sizes":[]},"_mask_switch":"","_mask_shape":"circle","_mask_image":{"url":"","id":"","size":""},"_mask_notice":"","_mask_size":"contain","_mask_size_tablet":"","_mask_size_mobile":"","_mask_size_scale":{"unit":"%","size":100,"sizes":[]},"_mask_size_scale_tablet":{"unit":"px","size":"","sizes":[]},"_mask_size_scale_mobile":{"unit":"px","size":"","sizes":[]},"_mask_position":"center center","_mask_position_tablet":"","_mask_position_mobile":"","_mask_position_x":{"unit":"%","size":0,"sizes":[]},"_mask_position_x_tablet":{"unit":"px","size":"","sizes":[]},"_mask_position_x_mobile":{"unit":"px","size":"","sizes":[]},"_mask_position_y":{"unit":"%","size":0,"sizes":[]},"_mask_position_y_tablet":{"unit":"px","size":"","sizes":[]},"_mask_position_y_mobile":{"unit":"px","size":"","sizes":[]},"_mask_repeat":"no-repeat","_mask_repeat_tablet":"","_mask_repeat_mobile":"","hide_desktop":"","hide_tablet":"","hide_mobile":"","_attributes":"","custom_css":""},"defaultEditSettings":{"defaultEditRoute":"content"},"elements":[],"widgetType":"elementor-syntax-highlighter","editSettings":{"defaultEditRoute":"content","panel":{"activeTab":"content","activeSection":"content_section"}},"htmlCache":"\t\t<div class=\"elementor-widget-container\">\n\t\t\t<pre><code class='language-python'>```sql\n CREATE SCHEMA hive.logistic WITH (location = 'gs://starburst-bluetab-test/logistic');\n\nCREATE VIEW &quot;hive&quot;.&quot;logistic&quot;.&quot;shipping_priority&quot; SECURITY DEFINER AS\nSELECT\n  l.orderkey\n, SUM((l.extendedprice * (1 - l.discount))) revenue\n, o.orderdate\n, o.shippriority\nFROM\n  tpch.tiny.customer c\n, tpch.tiny.orders o\n, tpch.tiny.lineitem l\nWHERE ((c.mktsegment = 'BUILDING') AND (c.custkey = o.custkey) AND (l.orderkey = o.orderkey))\nGROUP BY l.orderkey, o.orderdate, o.shippriority\nORDER BY revenue DESC, o.orderdate ASC;\n\n\nCREATE VIEW &quot;hive&quot;.&quot;logistic&quot;.&quot;minimum_cost_supplier&quot; SECURITY DEFINER AS\nSELECT\n  s.acctbal\n, s.name SupplierName\n, n.name Nation\n, p.partkey\n, p.mfgr\n, s.address\n, s.phone\n, s.comment\nFROM\n  tpch.tiny.part p\n, tpch.tiny.supplier s\n, tpch.tiny.partsupp ps\n, tpch.tiny.nation n\n, tpch.tiny.region r\nWHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (p.size = 15) AND (p.type LIKE '%BRASS') AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE') AND (ps.supplycost = (SELECT MIN(ps.supplycost)\nFROM\n  tpch.tiny.partsupp ps\n, tpch.tiny.supplier s\n, tpch.tiny.nation n\n, tpch.tiny.region r\nWHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE'))\n)))\nORDER BY s.acctbal DESC, n.name ASC, s.name ASC, p.partkey ASC;\n\n\n\nselect\n  cst.name as CustomerName,\n  cst.address,\n  cst.phone,\n  cst.nationkey,\n  cst.acctbal as BookedOrders,\n  cst.mktsegment,\n  nat.name as Nation,\n  reg.name as Region\nfrom tpch.sf1.customer as cst\njoin tpch.sf1.nation as nat on nat.nationkey = cst.nationkey\njoin tpch.sf1.region as reg on reg.regionkey = nat.regionkey\nwhere reg.regionkey = 1;\n\nselect\n  nat.name as Nation,\n  avg(cst.acctbal) as average_booking\nfrom tpch.sf100.customer as cst\njoin tpch.sf100.nation as nat on nat.nationkey = cst.nationkey\njoin tpch.sf100.region as reg on reg.regionkey = nat.regionkey\nwhere reg.regionkey = 1\ngroup by nat.name;\n```\n </code></pre><script>\nif (!document.getElementById('syntaxed-prism')) {\n\tvar my_awesome_script = document.createElement('script');\n\tmy_awesome_script.setAttribute('src','https://bluetab.net/wp-content/plugins/syntax-highlighter-for-elementor/assets/prism2.js');\n\tmy_awesome_script.setAttribute('id','syntaxed-prism');\n\tdocument.body.appendChild(my_awesome_script);\n} else {\n\twindow.Prism && Prism.highlightAll();\n}\n</script>\t\t</div>\n\t\t"}]} 

En esta prueba lo más relevante es que vamos a escribir los datos de la tablas customer(15000000 rows), orders(150000000 rows), supplier(1000000 rows), nation(25 rows) y region(5 rows) en nuestro bucket de GCS.

Como comentamos anteriormente, Starburst no solo es una herramienta que te permite lanzar consultas para analizar datos, sino que también te puede ayudar en las migraciones de datos de tu compañía, volcando la información de tu base de datos a tu plataforma de la nube. Una cosa muy importante a tener en cuenta es que Starburst te permite trabajar con distintos tipos de fichero, pudiendo escribir tus tablas finales en ORC, Parquet o formatos como Delta o Hudi dándote una libertad muy amplia en las migraciones al cloud.

Como última prueba para ver que todo está funcionando correctamente, vamos a lanzar una consulta para federar distintos datos de diversas fuentes. En nuestro caso, federaremos datos de la anterior tabla que hemos creado en Google Cloud Storage llamada customer con una tabla llamada nation, que nos crearemos en el PostgreSQL que hemos configurado en nuestro despliegue, y la tabla region que está en el esquema tcph. Esta consulta la podemos encontrar en el archivo `federate.sql`:

create schema postgres.logistic;
create table postgres.logistic.nation as select * from tpch.sf1.nation;

select
  cst.name as CustomerName,
  cst.address,
  cst.phone,
  cst.nationkey,
  cst.acctbal as BookedOrders,
  cst.mktsegment,
  nat.name as Nation,
  reg.name as Region
from hive.datalake.customer as cst
join postgres.logistic.nation as nat on nat.nationkey = cst.nationkey
join tpch.sf1.region as reg on reg.regionkey = nat.regionkey
where reg.regionkey = 1;
 

Este tipo de consultas es uno de los puntos fuertes que tiene Starburst, poder federar consultas que se encuentren en distintos silos de información sin la necesidad de migrar los datos y pudiendo atacar a distintos Cloud o a información que se tenga en el onpremise. 

Una vez que hemos probado que tanto las consultas como la escritura en GCS funcionan correctamente, vamos a realizar unos test de performance para simular usuarios en paralelo y ver como autoescala nuestra plataforma. Vamos a configurar JMeter para estas pruebas. Para ello hemos tenido que configurar el conector jdbc de trino para que mande consultas a nuestro cluster.

Vamos a simular 20 usuarios en paralelo, y cada uno lanzará una secuencia de 5 consultas. Esto significa que habrá 20 consultas en paralelo al mismo tiempo, simulando un escenario real, ya que generalmente no se lanzarán consultas de todos los usuarios en el mismo momento. Las consultas que vamos a ejecutar son las siguiente:

```sql
select
  cst.name as CustomerName,
  cst.address,
  cst.phone,
  cst.nationkey,
  cst.acctbal as BookedOrders,
  cst.mktsegment,
  nat.name as Nation,
  reg.name as Region
from tpch.sf1.customer as cst
join tpch.sf1.nation as nat on nat.nationkey = cst.nationkey
join tpch.sf1.region as reg on reg.regionkey = nat.regionkey
where reg.regionkey = 1;

SELECT
  s.acctbal
, s.name SupplierName
, n.name Nation
, p.partkey
, p.mfgr
, s.address
, s.phone
, s.comment
FROM
  tpch.tiny.part p
, tpch.tiny.supplier s
, tpch.tiny.partsupp ps
, tpch.tiny.nation n
, tpch.tiny.region r
WHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (p.size = 15) AND (p.type LIKE '%BRASS') AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE') AND (ps.supplycost = (SELECT MIN(ps.supplycost)
FROM
  tpch.tiny.partsupp ps
, tpch.tiny.supplier s
, tpch.tiny.nation n
, tpch.tiny.region r
WHERE ((p.partkey = ps.partkey) AND (s.suppkey = ps.suppkey) AND (s.nationkey = n.nationkey) AND (n.regionkey = r.regionkey) AND (r.name = 'EUROPE'))
)))
ORDER BY s.acctbal DESC, n.name ASC, s.name ASC, p.partkey ASC;

SELECT
count(*)
FROM
  tpch.sf1.customer c
, tpch.sf1.orders o
, tpch.sf1.lineitem l
WHERE ((c.mktsegment = 'BUILDING') AND (c.custkey = o.custkey) AND (l.orderkey = o.orderkey))
GROUP BY l.orderkey, o.orderdate, o.shippriority
ORDER BY o.orderdate ASC;
```
 

Si nos fijamos, en nuestro cluster de Kubernetes podemos ver que se están levantando más workers de Starburst por el momento de alta demanda en nuestra simulación:

Esto es una de las características más cómodas e importantes que nos da Starburst, ya que hace que nuestra plataforma de analítica de datos sea 100% elástica y podamos ir adaptándonos a los picos de demanda que tengamos.

Métricas

Por último, Starburst nos proporciona una interfaz donde visualizar ciertas métricas del consumo de nuestro cluster, como puede ser la memoria, la cpu o las consultas realizadas en tiempo real en nuestro cluster.

Además de estas métricas, hemos añadido también a nuestra configuración el despliegue de Prometheus y Grafana para integrarnos con las herramientas más comunes dentro de cualquier organización. Las métricas que hemos añadido a Grafana son consumo de memoria de nuestro cluster de Starburst, consultas realizadas por los usuarios, consultas con errores, memoria total de nuestro cluster de Kubernetes y Workers activos. Una vez integradas dichas métricas, el dashboard que tendríamos sería el siguiente:

Una vez integrado con Grafana, podríamos crearnos alertas de envío de mensajes por si hay algún problema en nuestro cluster de Starburst, y así tener todo el flujo de operaciones cubierto para evitarnos dolores de cabeza si hubiera algún tipo de incidencia o indisponibilidad.

El dashboard está publicado en Grafana[14] para que cualquier persona pueda hacer uso de él.

Conclusiones

Desde hace ya unos años, las grandes corporaciones se enfrentan a un desafío común cuando intentan compartir y analizar información entre departamentos ya que cada departamento almacena y gestiona sus datos de manera aislada. Estos silos dificultan el acceso y la integración de datos, lo que impide una visión completa y unificada de la información empresarial. La falta de interoperabilidad entre los silos de datos obstaculiza la toma de decisiones informada, ralentiza los procesos analíticos y limita la capacidad de las organizaciones para obtener una ventaja competitiva. Si tu organización se encuentra en una situación similar, Starburst es tu herramienta.

Starburst te facilita el acceso y análisis a todos estos silos de información y da la capacidad de federar datos de diversas fuentes y ubicaciones, ya sea datos en el Cloud o en tu datacenter onpremise. Permite realizar consultas en tiempo real sin necesidad de mover o transformar los datos previamente. Esto agiliza el proceso analítico y brinda a las organizaciones una visión 360 de sus datos. Además, no solo te ayuda a la hora de consultar datos de distintas fuentes, sino que también te puede ayudar en tus migraciones al Cloud, ya que te permite consultar cualquier origen y volcar dicha información en un almacenamiento como S3 o GCS en formato de ficheros abierto, como puede ser Parquet.

Una de las principales ventajas de Starburst, es que te permite desplegar la infraestructura en Kubernetes para aprovechar así todo su potencial. Kubernetes te da la capacidad de adaptarse dinámicamente a la carga de trabajo. Con esta función, los clústeres de Starburst pueden aumentar o disminuir automáticamente el número de Workers según la demanda. Esto permite optimizar el uso de recursos y garantizar un rendimiento óptimo, ya que los pods adicionales se crean cuando la carga aumenta y se eliminan cuando disminuye. Esto dentro de cualquier organización es un punto muy importante, ya que mejora la eficiencia operativa al minimizar el tiempo de inactividad y los costos innecesarios, al tiempo que asegura una disponibilidad constante y una respuesta ágil a los picos de trabajo. Además, una cosa a tener en cuenta es que puedes realizar la instalación de Starburst tanto en cualquiera de los Cloud, como en onpremise.

Además, también te permite tener un roleado y gobierno de los usuarios dentro de tu plataforma, dando una granularidad a nivel de acceso a los datos a cada usuario, permitiéndote crear roles para ciertos esquemas, tablas o hasta columnas y filas dentro de una tabla.

Los que trabajamos con datos sabemos de la dificultad de trabajar con multitud de fuentes de datos, entornos diversos, herramientas de todo tipo, etc. Uno de los puntos más diferenciales de Starburst es tener la capacidad de consultar los datos desde su almacenamiento, eliminando duplicidad de información, pudiendo así tener una mejor eficiencia en cuanto al storage, y facilitando también el gobierno de estos datos.

En conclusión, Starburst es una herramienta a tener en cuenta si quieres llevar a tu organización al siguiente nivel en el mundo de los datos, o si te estás planteando una estrategia de datos con una visión y una filosofía más orientada al data mesh.

Referencias

[1] Qué es Starburst.[link] 

[2] Qué es Trino. [link]

[3] Principios del Data Mesh. [link]

[4] Introducción  a DBT. [link]

[5] Introducción a Jupyter Notebook. [link]

[6] Introducción a Power BI. [link]

[7] Qué es Prometheus.. [link]

[8] Qué es Grafana. [link]

[9] Qué es Terraform. [link]

[10] Qué es Jmeter.[link]

[11] Módulo de GKE.[link]

[12] Módulo de VPC.[link]

[13] Qué es TPCH.[link]

[14] Dashboard Grafana.[link]

[15] Repositorio de Github con el despliegue.[link]

Navegación

Lucas Calvo

Cloud Engineer

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?
DESCUBRE BLUETAB

SOLUCIONES, SOMOS EXPERTOS

DATA STRATEGY
DATA FABRIC
AUGMENTED ANALYTICS

Te puede interesar

LA BANCA Y LA ERA DEL OPEN DATA

abril 19, 2023
LEER MÁS

Gobierno del Dato: Una mirada en la realidad y el futuro

mayo 18, 2022
LEER MÁS

Snowflake: Zero-Copy clone, o cómo librarte del duplicado de datos al clonar.

marzo 22, 2023
LEER MÁS

Serverless Microservices

octubre 14, 2021
LEER MÁS

Cómo preparar la certificación AWS Data Analytics – Specialty

noviembre 17, 2021
LEER MÁS

Mi experiencia en el mundo de Big Data – Parte I

octubre 14, 2021
LEER MÁS

Publicado en: Blog, Practices, Tech

CDKTF: Otro paso en el viaje del DevOps, introducción y beneficios.

mayo 9, 2023 by Bluetab

CDKTF: Otro paso en el viaje del DevOps, introducción y beneficios.

Lucas Calvo

Cloud Engineer

Introducción

En este artículo vamos a hablar de CDKTF y de cómo utilizar todas sus ventajas para desplegar infraestructura de forma programática y reutilizable en GCP. También veremos cómo integrar CDKTF con tus módulos de terraform[1] para desplegar infraestructura más reutilizable bajo la supervisión de tu organización.

CDKTF abre un mundo de posibilidades para llevar a nuestra organización al siguiente nivel de automatización, además de facilitar el despliegue de la infraestructura a las personas más cercanas a la parte de desarrollo. En este artículo daremos algunas indicaciones de cuando es una buena opción utilizar CDKTF y cuando seguir utilizando terraform a través de HCL, ya que no en todos los casos de usos el CDKTF nos aportará un valor añadido.

¿Qué necesitas para entender este artículo?

  • Algunos conceptos sobre Terraform[2].
  • Instalar el CDKTF [3].
  • Algunos conceptos sobre python.
  • Necesitas una cuenta gratuita en GCP.

Todo el código utilizado en este artículo está en el repositorio[4] de Github.

¿Es CDKTF la solución milagrosa para los despliegues en nuestra organización? Veámoslo.

¿Que es el CDKTF?

CDKTF, también llamado Cloud Development Kit for Terraform, permite definir y aprovisionar infraestructura de forma programática. En este artículo utilizaremos python para desplegar algunos recursos en GCP. El punto fuerte de CDKTF es que no necesitas aprender HashiCorp Configuration Language (HCL), sólo necesitas saber Python que es más flexible que HCL porque te permite crear más integraciones con herramientas de tu organización y con otras APIs. Incluso puedes crear algunas clases específicas en Python para hacer tu código más reutilizable.

Primeros pasos con CDKTF

Una vez explicado CDKTF, procederemos a crear nuestro primer proyecto. Para ello desplegaremos un cloud storage y un topic de pubsub en GCP, utilizaremos recursos terraform por simplicidad. Comenzaremos explicando varios comandos del CDKTF:

  • cdktf init –template=python

Este comando crea un nuevo proyecto CDK para Terraform usando una plantilla. Esto es muy útil cuando se quiere empezar a utilizar un nuevo proveedor, en nuestro caso el proveedor de Google.

Una vez ejecutado este comando veremos la siguiente plantilla:

Los ficheros más importantes son `main.py` y `cdktf.json`. Hablemos de ellos.

En el fichero `main.py` es donde se declara toda la infraestructura que vamos a desplegar con su lógica. Haremos uso del proveedor de Google para definir nuestros recursos, `cloud storage` y `pubsub topic`. Luego para definir e importar el proveedor de google y la librería de almacenamiento y pubsub importaremos los siguientes módulos en python:

```python
from imports.google.provider import GoogleProvider
from imports.google.storage_bucket import StorageBucket
from imports.google.pubsub_topic import PubsubTopic
``` 

Estos proveedores se definen en el archivo `cdktf.json`, este archivo es donde puedes proporcionar los ajustes de configuración personalizados para tu aplicación y definir los proveedores y módulos que deseas utilizar. Cuando inicializamos la plantilla con el comando `cdktf init –template=python`, la plantilla genera un archivo `cdktf.json` básico en tu directorio raíz que puedes personalizar para tu aplicación.

Este archivo tiene la siguiente información:

```json
{
  "language": "python",
  "app": "pipenv run python main.py",
  "projectId": "da305019-c0fc-4e47-b4ad-1a705cdd8811",
  "sendCrashReports": "false",
  "terraformProviders": ["google@~> 4.0"],
  "terraformModules": [],
  "codeMakerOutput": "imports",
  "context": {
    "excludeStackIdFromLogicalIds": "true",
    "allowSepCharsInLogicalIds": "true"
  }
}
``` 

En la línea terraformProviders hemos definido el proveedor de google que contiene todos los recursos que necesitamos. En la sección Integración con tus propios módulos aprenderemos a configurar este fichero para utilizar tus propios módulos terraform.

Una vez configurados los proveedores ya podemos definir nuestros recursos con Python:

```python
class MyStack(TerraformStack):
    def __init__(self, scope: Construct, id: str):
        super().__init__(scope, id)

        GoogleProvider(self, "google", region="europe-west4",project="xxxxx")
        length = 5
        suffix = ''.join((random.choice(string.ascii_lowercase) for x in range(length)))
        bucket = StorageBucket(self, "gcs", name = "cdktf-test-1234-bt-"+ str(suffix), location = "EU", force_destroy = True)
        topic = PubsubTopic(self, "topic" ,name = "cdktf-topic", labels={"tool":"cdktf"})
        TerraformOutput(self,"bucket_self_link",value=bucket.self_link)
        TerraformOutput(self,"topic-id",value=topic.id)

app = App()
MyStack(app, "first_steps")

app.synth()
``` 

Estas líneas de código despliegan un cloud storage y un topic como hemos dicho previamente, también hemos creado un `string` aleatorio en python para añadir al cloud storage como sufijo. Para ello hemos añadido dos librerías más: `string` y `random`. Además, hemos añadido a nuestro script algunas salidas para ver alguna información importante sobre nuestro despliegue como `topic_id` o `bucket_self_link`.

El resultado final de nuestros primeros scripts con CDKTF es el siguiente:

```python
from constructs import Construct
from cdktf import App, TerraformStack, TerraformOutput
from imports.google.provider import GoogleProvider
from imports.google.storage_bucket import StorageBucket
from imports.google.pubsub_topic import PubsubTopic
import random
import string

class MyStack(TerraformStack):
    def __init__(self, scope: Construct, id: str):
        super().__init__(scope, id)

        GoogleProvider(self, "google", region="europe-west4",project="xxxxx")
        length = 5
        suffix = ''.join((random.choice(string.ascii_lowercase) for x in range(length)))
        bucket = StorageBucket(self, "gcs", name = "cdktf-test-1234-bt-"+ str(suffix), location = "EU", force_destroy = True)
        topic = PubsubTopic(self, "topic" ,name = "cdktf-topic", labels={"tool":"cdktf"})
        TerraformOutput(self,"bucket_self_link",value=bucket.self_link)
        TerraformOutput(self,"topic-id",value=topic.id)

app = App()
MyStack(app, "first_steps")

app.synth()
``` 

Ahora podemos desplegar nuestra infraestructura, para ello necesitamos ejecutar algunos comandos con CDKTF. En primer lugar, tenemos que descargar los proveedores y módulos para una aplicación y generar las construcciones CDK para ellos. Para ello utilizamos `cdktf get`. Utiliza el archivo de configuración `cdktf.json` para leer la lista de proveedores. Este comando sólo genera los bindings de los proveedores que faltan, por lo que es muy rápido si nada ha cambiado.

```bash
cdktf get
``` 

Esta es la salida del comando:

Usamos el flag –force para recrear todos los bindings. Con el proveedor descargado procederemos al despliegue ejecutando el comando `cdktf deploy`:

```bash
cdktf deploy
``` 

Esta es la salida del comando:

Con todos estos pasos hemos procedido a desplegar nuestra primera aplicación con el CDKTF. Algo bastante sencillo y con código muy reutilizable. Ahora vamos a proceder a la destrucción de la infraestructura para no incurrir en ningún coste. Utilizaremos el comando `cdktf destroy`.

Integraciones con tus propios módulos

Perfecto, una vez comprobado cómo funciona el CDKTF vamos a integrarlo con los módulos terraform que se desarrollan en nuestra empresa. Esto nos permitiría hacer el código mucho más reutilizable permitiendo que todo lo que se despliegue en el CDKTF se despliegue con los patrones que hemos definido en los módulos. Para esta prueba ejecutaremos la misma creación (gcs y topic) pero esta vez haciendo uso de los módulos previamente desarrollados que podéis encontrar en el siguiente repositorio.

  • Cloud Storage[5]
  • Pubsub[6]

Estos módulos han sido desarrollados con HCL y tienen ciertas nomenclaturas y lógica para facilitar al máximo el despliegue al resto de desarrolladores de mi organización.

Así que procedamos a crear otra plantilla con el comando `cdktf init –template=python` pero esta vez para usar nuestros propios módulos.

Una vez ejecutado tenemos la misma plantilla que en el apartado anterior. Ahora vamos a proceder a modificar el `cdktf.json` para añadir los módulos que vamos a utilizar y dos proveedores, google y google-beta, que son necesarios para el uso de estos módulos.

Este es el fichero `cdktf.json`:

```json
{
  "language": "python",
  "app": "pipenv run python main.py",
  "projectId": "f02a016f-d673-4390-86db-65348eadfb3f",
  "sendCrashReports": "false",
  "terraformProviders": ["google@~> 4.0", "google-beta@~> 4.0"],
  "terraformModules": [
    {
      "name": "gcp_pubsub",
      "source": "git::https://github.com/lucasberlang/gcp-pubsub.git?ref=v1.2.0"
    },
    {
      "name": "gcp_cloud_storage",
      "source": "git::https://github.com/lucasberlang/gcp-cloud-storage.git?ref=v1.2.0"
    }
  ],
  "codeMakerOutput": "imports",
  "context": {
    "excludeStackIdFromLogicalIds": "true",
    "allowSepCharsInLogicalIds": "true"
  }
}
```
 

Hemos añadido la línea terraform Modules donde indicamos el nombre del módulo y la fuente, en este caso nuestro repositorio de github. También hemos añadido la línea terraform providers como en el apartado anterior.

Una vez añadidos los proveedores y los módulos terraform vamos a instanciarlos en nuestro main, para ello solo tenemos que añadirlos como librerías y luego invocarlos con los parámetros que estén definidos en nuestro módulo. Puedes ir al readme del módulo que está subido en github para ver que parámetros son obligatorios y cuales son opcionales, también puedes ver salidas de esos módulos.

El código quedaría de la siguiente manera:

```python
#!/usr/bin/env python
from constructs import Construct
from cdktf import App, TerraformStack, TerraformOutput
from imports.google.provider import GoogleProvider
from imports.google_beta.provider import GoogleBetaProvider
from imports.gcp_pubsub import GcpPubsub
from imports.gcp_cloud_storage import GcpCloudStorage
import random
import string

class MyStack(TerraformStack):
    def __init__(self, scope: Construct, ns: str):
        super().__init__(scope, ns)
        GoogleProvider(self, "google", region="europe-west4")
        GoogleBetaProvider(self, "google-beta", region="europe-west4")
        length = 5
        suffix = ''.join((random.choice(string.ascii_lowercase) for x in range(length)))
        tags = {"provider" : "go",
                "region" : "euw4",
                "enterprise" : "bt",
                "account" : "poc",
                "system" : "ts",
                "environment" : "poc",
                "cmdb_name" : "",
                "security_exposure_level" : "mz",
                "status" : "",
                "on_service" : "yes"}

        topic = GcpPubsub(self,"topic",
          name = "cdktf-topic",
          project_id = "xxxxxxx",
          offset = 1,
          tags = tags)
          
        bucket = GcpCloudStorage(self,"bucket",
          name = "cdktf-test-1234-bt-" + suffix,
          project_id = "xxxxxxx",
          offset = 1,
          location = "europe-west4",
          force_destroy = True,
          tags = tags)
        
        TerraformOutput(self,"topic_id",value=topic.id_output)
        TerraformOutput(self,"bucket_self_link",value=bucket.bucket_output)

app = App()
MyStack(app, "cdktf_modules")

app.synth()
```
 

Para invocar nuestros módulos que hemos añadido previamente en el archivo `cdktf.json`, sólo tenemos que añadir este código:

```python
from imports.gcp_pubsub import GcpPubsub
from imports.gcp_cloud_storage import GcpCloudStorage
``` 

El resto del código es la invocación de nuestros módulos con una serie de parámetros para inicializarlos, como región, nombre, etc. También hemos añadido las salidas para tener algo de información sobre la creación de los recursos en GCP. Ahora, vamos a proceder al despliegue de los recursos para comprobar el correcto funcionamiento de CDKTF.

```bash
cdktf get --force
cdktf deploy
``` 

Una vez desplegada, comprobaremos nuestra infraestructura en GCP y procederemos a borrar toda con el comando `cdktf destroy`.

Evoluciones que puedes añadir a tu empresa

Gracias al CDKTF podemos crear nuevos automatismos mucho más nativos que con el HCL tradicional ya que podemos integrarnos con todo tipo de backend en nuestro propio desarrollo. Esto abre todo un nuevo mundo de posibilidades en el despliegue automático de infraestructuras.

Por ejemplo, si en tu empresa siempre te piden el mismo tipo de infraestructura desde los equipos de desarrollo, como una base de datos, un cluster kubernetes y luego los componentes de seguridad y comunicaciones asociados al caso de uso, ¿por qué no automatizar este proceso y no crear proyectos terraform a la carta?.

Podemos evolucionar nuestra plataforma de automatización creando un portal web que invoque a nuestro microservicio hecho con el CDKTF que hará las validaciones oportunas y luego procederá al despliegue. Esto también se podría hacer con terraform pero no de una forma tan nativa como con el CDKTF ya que ahora usando python (u otro lenguaje, Typescript, Go etc…) podemos crear flujos de trabajo mucho más complejos llamando a otros backends y haciendo todo tipo de integraciones con nuestras herramientas corporativas. Podríamos generar una plataforma de despliegue para automatizar todos nuestros despliegues genéricos que nos solicitan desde otros equipos como aplicaciones, analítica de datos, reporting, etc. Podríamos crear la siguiente arquitectura para resolver este problema:

Conclusiones

Después de haber trabajado varios años con terraform creo que el CDKTF es su evolución natural, aunque todavía está en una fase prematura. No cuenta con una comunidad tan grande como la que terraform tiene con HCL, lo que hace difícil iniciarse con esta herramienta. Depurar el código suele ser complicado y no tan fácil como con HCL. Los tutoriales oficiales no son muy completos por lo que muchas veces tendrás que encontrar tu propio camino para resolver algunos problemas derivados del uso de CDKTF. También creo que el CDKTF está en un punto de madurez como lo estaba terraform hace años en la versión inferior a la 0.11.0, es decir, funciona bien aunque todavía le queda mucho camino por recorrer.

Creo que si tu empresa ya utiliza terraform (HCL) de forma madura, cambiar el modelo a CDKTF no va a suponer grandes beneficios. El único beneficio de usar CDKTF es en un caso de uso como el mencionado en la sección anterior, donde puedes mezclar el uso de tus módulos ya desarrollados con HCL y CDKTF para llevar la automatización de cierta infraestructura a un nivel superior.

Por otro lado, CDKTF es una herramienta que podría recomendar si conoces python (u otros lenguajes) y no quieres aprender un lenguaje específico como HCL. CDKTF puede ser una buena herramienta si tu empresa no está en un punto de madurez avanzado con terraform o cualquier herramienta de IaC. El CDKTF te permite desarrollar de una forma más sencilla tu infraestructura como código, las integraciones con otras herramientas dentro de tu organización serán mucho más sencillas ya que podrás utilizar tu lenguaje de programación favorito para realizarlas. Puede crear clases y módulos reutilizables de forma sencilla, creando una comunidad de desarrollo CDKTF dentro de su propia empresa y permitiendo a los desarrolladores estar más apegados a la infraestructura, lo que siempre es un reto. También la parte de pruebas de tu código CDKTF será mucho más fácil y nativa haciendo uso de pytest u otros frameworks [7]. Probar con terraform (HCL) es más tedioso y ya tienes que usar frameworks como terratest para integrarlos en tu código.

En general creo que CDKTF es una buena herramienta y es la evolución natural de Terraform. Si queremos llevar nuestra automatización a otro nivel e integrarla con portales web o herramientas organizativas, CDKTF es la herramienta que necesitamos. También abre un mundo de posibilidades para los equipos de desarrollo, ya que podrán desplegar cualquier tipo de infraestructura utilizando un lenguaje de programación. Habrá que ver cómo evoluciona para ver cómo encaja en nuestras organizaciones y si alcanza el punto de madurez que ha alcanzado Terraform.

Referencias

[1] Ques es terraform.[link]

[2] Módulos de Terraform. [link]

[3] Guía de instalación del CDKTF. [link]

[4] Repositorio de CKDTF GitHub. [link]

[5] Repositorio de Cloud storage GitHub. [link]

[6] Repositorio de Pubsub GitHub. [link]

[7] Frameworks de testing.. [link]

Navegación

Lucas Calvo

Cloud Engineer

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?
DESCUBRE BLUETAB

SOLUCIONES, SOMOS EXPERTOS

DATA STRATEGY
DATA FABRIC
AUGMENTED ANALYTICS

Te puede interesar

Databricks sobre Azure – Una perspectiva de Arquitectura (parte 1)

febrero 15, 2022
LEER MÁS

¿Cómo gobernar los productos de datos? Hacia un gobierno de los datos federado

mayo 12, 2022
LEER MÁS

Workshop Ingeniería del caos sobre Kubernetes con Litmus

julio 7, 2021
LEER MÁS

DataOps

octubre 24, 2023
LEER MÁS

Conceptos básicos de AWS Glue

julio 22, 2020
LEER MÁS

Hashicorp Boundary

diciembre 3, 2020
LEER MÁS

Publicado en: Blog, Practices, Tech

LakeHouse Streaming en AWS con Apache Flink y Hudi

abril 11, 2023 by Bluetab

LakeHouse Streaming en AWS con Apache Flink y Hudi

Alberto Jaen

AWS Cloud Engineer

Alfonso Jerez

AWS Cloud Engineer

Adrián Jiménez

AWS Cloud Engineer

Introducción

Cada día la ingesta y procesamiento de streams de datos en Near Real Time (NRT) es más necesario. Los requisitos de negocio son cada vez más exigentes en cuanto a tiempos de procesamiento y la disponibilidad de los datos más recientes y este artículo pretende abordar esta cuestión.

Utilizando la nube de AWS y con un enfoque serverless se desplegará en este artículo una aplicación capaz de ingestar streams de datos y procesarlos en NRT, escribiendo su resultado en un LakeHouse de tal manera que se puedan realizar operaciones ACID (Atomicidad, Consistencia, Aislamiento y Durabilidad) sobre estos. Se desplegará una arquitectura en la que se ingestan datos con Locust, se procesan con Flink y finalmente se escriben en Hudi y JSON.

Locust es un framework de Python que sirve para poder realizar Load Testing de una manera fácil y escalable. Las ventajas que ofrece Locust son la capacidad de poder definir este comportamiento de los usuarios con un lenguaje de propósito general y su facilidad de escalado.

Flink se ha convertido en un framework de referencia en el ámbito de procesamiento distribuido sobre streams de datos. Se caracteriza por su orientación al procesamiento de streams (aunque también puede ejecutar procesos batch), su rapidez de procesamiento y su eficiencia en el uso de memoria. Hay otros frameworks populares en el sector, como Spark Streaming y Storm, en el apartado de arquitectura se discutirá por qué en última instancia Flink ha sido el elegido.

Finalmente, Hudi es un formato de fichero transaccional que proporciona las habilidades propias de una base de datos y DataWarehouse al Data Lake. Hudi da la capacidad de dejar atrás los conceptos de batching y sustituirlo con una perspectiva de procesamiento incremental. Como el resto de las tecnologías usadas en este artículo, se describe en detalle más adelante.

Todo el código utilizado en este artículo, tanto IaC como de Python, puede visitarse en nuestro repositorio[1] en Github.

En próximos artículos

Múltiples artículos utilizarán este como base para hablar de los siguientes temas:

  • Comparativa en cuanto a eficiencia de procesamiento, escritura y lectura de ficheros y costes en JSON vs Hudi.
  • Comparativa de MOR vs COW, además del consumo de estas tablas por los distintos tipos de queries (Snapshot, Read Optimized, Incremental).
  • Escalabilidad.
  • Otras formas de explotación del dato, como pueden ser Redshift o Pinot.

Arquitectura

A continuación se puede ver la arquitectura a alto nivel que se desplegará:

Como se puede ver, se utiliza Locust como herramienta de Load Testing para enviar datos sintéticos a nuestra aplicación. Estos serán ingestados a través de un Kinesis Stream aprovisionado en modo On Demand, de esta manera el stream escalará de manera automática. La alternativa al modo On Demand es el modo Provisioned, donde debemos especificar el número de shards (componente en los que se divide el stream), con el que queremos aprovisionar el stream. Las diferencias y particularidades de estos dos modos se explicarán más en detalle en el apartado de Kinesis.

Del stream de entrada leen las dos aplicaciones de Kinesis Analytics Flink. Como se mencionó en el apartado de próximos pasos, la razón de tener dos aplicaciones independientes escribiendo en Hudi y JSON respectivamente es para realizar una comparativa en próximos artículos en cuanto a eficiencia. Finalmente los datos se alojarán en S3, el servicio de almacenamiento de objetos de AWS.

La particularidad que tiene la aplicación de Kinesis Analytics Flink es que es serverless, es decir, abstrae al desarrollador de la complejidad de configurar y desplegar un cluster con Flink. A esta aplicación se deben asignar unos KPUs o Kinesis Processing Units y un jar con la librería de Flink y los conectores necesarios para poder desplegarla correctamente. Todos estos conceptos serán explicados en los siguientes apartados.

La alternativa a esta perspectiva serverless con un servicio administrado en AWS es la administración completa de la aplicación por parte del desarrollador, pudiendo utilizar herramientas como Kubernetes o EKS (Kubernetes administrado en AWS) para poder desplegar en un cluster esta aplicación Flink. Las ventajas de esta segunda alternativa sería el poder configurar tanto el cluster (número de nodos, memoria, CPU, disco duro, etc…) como la aplicación Flink (gestión de disaster recovery, gestión de metadatos, etc…) con un grado de detalle mucho mayor. En este artículo se decidió la primera alternativa por su simplicidad y facilidad de uso a la hora de conocer el framework de Flink.

Locust

La primera pieza en la pipeline de ingesta de datos es el componente de Locust escrito en Python. A diferencia de otros frameworks disponibles en el mercado como JMeter, Locust nos da la capacidad de poder escribir un código simple con Python en vez de utilizar un lenguaje específico a un dominio o una interfaz de usuario.

Además, Locust está basado en eventos y utiliza greenlet[2], lo que le da la capacidad de con un solo hilo del procesador poder administrar la capacidad de varios miles de usuarios. Por ejemplo, en el caso de JMeter, se necesita un hilo para cada usuario, lo que supone un problema de escalabilidad para casos en los que se necesite un número alto de estos.

Locust tiene varias posibilidades a la hora de ejecutarse y escalar, pudiendo funcionar en local para aplicaciones con menos exigencias en cuanto a volumen de datos o desplegar en un cluster de Kubernetes al crear una imagen de Docker a raíz del código de Locust.

En cuanto a clientes y sistemas a los que enviar datos, Locust proporciona un cliente HTTP integrado. En el caso de querer enviar eventos a otros sistemas, como el de este artículo, siempre se puede escribir un cliente personalizado gracias a la ventaja de ser un framework de Python.

Además, Locust también proporciona una interfaz web para poder comprobar el progreso de tu envío de datos en tiempo real. Por todas estas razones se ha decidido utilizar esta tecnología en este artículo.

Kinesis Data Analytics

Para la ingesta de datos, se utilizará Kinesis Data Streams, un servicio de streaming de datos completamente administrado y serverless ofrecido por AWS. Un Kinesis Stream está formado por una agrupación lógica de shards, que representan la unidad fundamental de capacidad de un stream y son procesados en paralelo. Cada shard dota al stream de 1 MB/s o 1,000 eventos por segundo de escritura y 2 MB/s de lectura. Los eventos serán distribuidos entre los shards de un stream en función de su clave de partición, por lo que es importante que el particionado sea homogéneo para evitar un sesgo en la distribución y ocurrencia de hot shards. Existen dos modos de aprovisionamiento de capacidad:

  • On Demand – el número de shards se gestiona automáticamente para acomodar la carga, asegurando un rendimiento óptimo sin necesidad de ajustes manuales.
  • Provisioned – debes especificar el número de shards para el stream en función de la carga esperada.

Por simplicidad, y por ser idóneo para nuestro caso de uso, se optará por el modo On Demand. Esto acomodará automáticamente el número de shards a la cantidad de datos generados por nuestra aplicación de Locust.

Para leer y procesar los datos ingestados a través de Kinesis Data Streams, se usará otro servicio de la familia Kinesis, Kinesis Data Analytics (KDA). Este servicio es ofrecido en dos sabores

  • Kinesis Analytics SQL – Permite la creación de aplicaciones de procesamiento de datos en streaming mediante el uso de SQL. Este servicio se considera deprecado en favor del servicio de KDA for Apache Flink.
  • Kinesis Analytics for Apache Flink – Proporciona una forma de desplegar un cluster de Flink gestionado por AWS. El uso de Flink faculta la creación de aplicaciones más avanzadas y con mayor rendimiento.

Una aplicación de Flink consta de una serie de tareas de procesado en paralelo, también conocidas como operadores, que se conectan en una Directed Acyclic Graph (DAG). El stream de datos es procesado por esta DAG, con cada operador ejecutando una operación específica sobre el dato.

KDA asigna potencia de computación para nuestra aplicación en forma de Kinesis Processing (KPUs), cada una de ellas equivalente a 1 vCPU y 4GB de RAM. Se determina el número de KPUs para la aplicación mediante la especificación de dos parámetros:

  • Parallelism – Número de tareas que se pueden ejecutar concurrentemente.
  • ParallelismPerKPU – Número de tareas que pueden ejecutarse en una única KPU.

El número total de KPUs de la aplicación viene dado por Parallelism / ParallelismPerKPU. Es posible desplegar este servicio con autoescalado automático, que ajustará automáticamente el número de KPUs en función del consumo de CPU para acomodar la demanda.

Figure 1. KDA configuration with Parallelism 4 and ParallelismPerKPU 2

Los costos[3] de Amazon Kinesis Analytics se basan en un modelo pay-per-use, apoyándose en las Kinesis Processing Units consumidas. Además, se asume un coste por el almacenamiento usado por la aplicación y sus copias de seguridad.

Flink

Profundizando más en la aplicación de Flink, una de las características más importantes es la capacidad de ser resiliente a fallos. Para ello, Flink incorpora un sistema de checkpointing mediante el cual se toma un snapshot de la aplicación y su estado que es guardado en un almacenamiento remoto en caso de que sea necesario recuperar la aplicación.

El proceso de checkpointing de una aplicación de Flink está diseñado para ser resiliente y eficiente. Flink puede hacer uso de diferentes backends para guardar el estado de la aplicación. El más simple sería la memoría de la propia Java Virtual Machine, y aunque esto ofrece baja latencia y una gestión más simple, rápidamente pueden surgir problemas de escalado y capacidad que no lo hacen recomendable para entornos de producción. Por eso es común el uso de RocksDB como backend de Flink, una base de datos de clave-valor con alto rendimiento, escalable y con tolerancia a fallos. Adicionalmente KDA guarda estos snapshots en S3 para una capa extra de durabilidad.

Para el propósito de este blog, se ha desarrollado una sencilla aplicación de  ingesta de datos en tiempo real y su posterior guardado en S3. Flink ofrece dos APIs mediante las cuales puedes desarrollar una aplicación:

  • DataStream API – Es una API basada en el concepto de streams. Ofrece control a bajo nivel de la aplicación con la desventaja de requerir un mayor esfuerzo por parte del desarrollador.
  • Table API – Esta API se basa en el concepto de tablas. Ofrece una manera declarativa de desarrollar la aplicación mediante el uso de expresiones SQL. Conlleva una pérdida de control sobre los detalles de la aplicación en favor de ser mucho más sencilla.

Para este caso de uso se usará la Table API por su simplicidad, pero es igualmente compatible con el uso de la DataStream API.

A la hora de desplegar la aplicación con Kinesis Data Analytics sólo es necesario definir el punto de entrada del código de la aplicación y proporcionar un uber jar con todas las dependencias de esta. Conviene explicar las dependencias usadas para esta aplicación, pues suele ser uno de los mayores puntos de fricción a la hora desarrollar una aplicación de Flink:

  • SQL connector for Kinesis – Conector fundamental para que nuestra aplicación de Flink sea capaz de leer de un Kinesis Stream.
  • S3 Filesystem for Hadoop – Permite a la aplicación operar sobre S3.
  • Hudi Bundle – Paquete proporcionado por los desarrolladores de Hudi, con todas las dependencias necesarias para trabajar con la tecnología.
  • Hadoop MapReduce Client Core – Dependencia adicional necesaria para que la escritura a Hudi funcione correctamente en KDA. Es posible que en futuras versiones del Hudi Bundle esta dependencia no sea necesaria.

 La aplicación está preparada para escribir datos tanto en formato JSON como en tablas de Hudi MoR o CoW (que se explicarán en detalle en la siguiente sección). Tanto el código de la aplicación como la infraestructura están disponibles en el repositorio.

Hudi

Conceptos

Hudi se presenta como una fuente de almacenamiento Open Source a nivel de formato de datos. Al igual que hacen otras soluciones como Iceberg o Delta Lake, ofrece algunas propiedades ya existentes en estas como es el soporte de transacciones ACID (Atomicidad, Consistencia, Aislamiento y Durabilidad), procesos enfocados a la optimización de tareas de lectura/escritura, actualización de datos incrementales y otras que se explicarán a continuación. Es importante resaltar que estas no podrían conseguirse mediante ficheros de formato Avro y Parquet.

Las características que presenta Hudi son las siguientes:

  • Transacciones ACID: unas de las principales ventajas que ofrece Apache Hudi es el soporte para transacciones ACID, posibilitando que las operaciones de escritura sean atómicas y consistentes. Además también proporciona que los datos estén aislados y sean duraderos, lo que garantiza la integridad de los datos y la consistencia del sistema. Más adelante se analizará más en detalle cómo las distintas formas de almacenamiento lo hacen posible y las ventajas que estas ofrecen.
  • Pipelines Incrementales: la clusterización de los eventos en función de variables de negocio permite que tareas de borrado/actualización de datos se puedan realizar de una forma más eficiente si estas se encuentran indexadas de forma conjunta aunque no se hayan dado en la misma franja temporal.
  • Ingesta en Streaming: Hudi permite obtener unos workloads computacionalmente menos pesados a través de Upserts que recurren a una indentación optimizada[4] por grupos de archivos, lo que hace que en tareas de escritura (Update/Append/Delete) sean más eficientes. Esto permite que muchas de las aplicaciones basadas en Hudi no deban ser deduplicadas.
  • Queries de estados previos de los datos – Time Travel: Hudi permite actualizar y consultar información de particiones pasadas sin la necesidad de tener que reprocesar ni incluir particiones temporales mayores. De esta manera se asegura que eventos enviados con posterioridad no sean procesados y sean correctamente almacenados.
  • Tareas de escritura simultáneas: mediante OCC (Optimistic Concurrency Control[5]) se permite que muchas de las tareas como Upsert e Insert puedan realizarse correctamente aun realizándose de forma simultánea.

A la hora de analizar cómo Hudi procede a realizar el almacenamiento de los eventos ingestados, estos son agrupados por particiones y estas a su vez agrupadas en grupos de archivos. Estos últimos teniendo asignado un file_id único para cada grupo en el cual se encuentra el base file, en formato parquet, el cual surge tras una acción, ya sea un commit o  compactación, y el log file que es donde se encuentran registrados todas las actualizaciones realizadas (event version tracking).

Tipos de Tablas y Queries

Hudi ofrece 2 tipos de tablas en función de la necesidad de negocio, esto tiene un impacto a nivel de performance y limitación de ciertas funcionalidades como se verán en más detalle:

Copy on Write (COW)

Sistema de almacenamiento mediante el cual en las tareas de actualización, eliminación o registro de nuevos datos se realizan directamente sobre el archivo de logs (delta file) y se crea una nueva instantánea que incluye una copia completa del conjunto de datos actualizado, incluyendo una nueva versión del base file y un archivo delta que contiene los cambios realizados en esa operación.

No es hasta la compactación de datos (programada o al alcanzar un tamaño de datos definido) cuando se realiza la combinación de los archivos delta con la versión más reciente del conjunto de datos completo.Se crea así un nuevo archivo completo donde se eliminan los archivos delta que ya no son necesarios, actualizando a su vez el archivo de índice para que pueda acceder a los datos del archivo compactado.

Este sistema de almacenamiento está especialmente recomendado para casos de uso en los que las tareas de lectura sean más frecuentes que las de escritura al no requerir de  transformaciones de datos adicionales al leer los datos. A continuación se muestra el Timeline de los principales archivos al realizarse las distintas tareas de escritura:

Acción NUEVO archivo base Archivo delta Archivo de índice Snapshot
Nuevo registro
Se escribe el registro en el archivo base
No se crea un archivo delta
Se actualiza el archivo de índice con el nuevo registro
No se crea un nuevo snapshot
Actualización de registro existente
Se escribe el registro actualizando en un nuevo archivo base
Se escribe el registro actualizando en el archivo delta
Se actualiza el archivo de índice con la versión actualizada del registro
No se crea un nuevo snapshot
Eliminación de registro
No se escribe el registro eliminado en el nuevo archivo
Se escribe una marca de eliminación en un nuevo archivo delta
Se actualiza el archivo de índice con la marca de eliminación
No se crea un nuevo snapshot
Compactación de archivos delta
Se fusionan los archivos delta en un nuevo archivo base
No se crea un nuevo archivo delta
Se crea un nuevo archivo índice que contiene todas las entradas del índice de los archivos fusionados
Se crea un nuevo snapshot que refleja el estado actual de los datos después de la compactación

Merge On-Read (MOR)

En este caso, no se utilizan delta files separados como en el modelo Copy-on-Write (COW). En su lugar, los cambios se escriben directamente en los archivos de datos existentes (base files). En las tareas en las que se realizan actualizaciones de registros, estos nuevos son añadidos en el base file, y en el caso de eliminación, estos son marcados como tal en el base file, en ambos casos estos cambios son registrados en el archivo de índice, hasta que se realiza la compactación. Es en esta operación donde se aplican todas las actualizaciones a los registros en el archivo base correspondiente y elimina las versiones anteriores de los registros actualizados. 

Esta alternativa está especializada en realizar consultas de datos históricos versionados y transformaciones y análisis NRT de grandes volúmenes, ya que es posible realizarlo sin tener que copiar los datos a otra ubicación en el disco. Además de ser óptimo para casos de uso en los que las tareas de escritura son concurrentes al ser más eficiente ya que no es necesario realizar transformaciones de datos adicionales durante la escritura, aunque posee una menor tolerancia al fallo ya que en caso de que el archivo de logs se corrompa puede generar pérdida de las versiones de los datos.

A continuación se muestra el Timeline de los principales archivos al realizarse las distintas tareas de escritura:

Acción Archivo base Archivo delta Archivo de índice Snapshot
Nuevo registro
Se escribe el registro en el archivo base
No se crea un archivo delta
Se actualiza el archivo de índice con el nuevo registro
No se crea un nuevo snapshot
Actualización de registro existente
Se escribe el registro actualizando en un nuevo archivo delta
Se escribe el registro actualizando en el archivo delta correspondiente
Se actualiza el archivo de índice con la versión actualizada del registro
No se crea un nuevo snapshot
Eliminación de registro
No se elimina el registro del archivo base
Se escribe una marca de eliminación en un nuevo archivo delta
Se actualiza el archivo de índice con la marca de eliminación
No se crea un nuevo snapshot
Compactación de archivos delta
Se fusionan los archivos delta en un nuevo archivo base
Se crea un nuevo archivo delta que contiene las actualizaciones pendientes después de la última compactación
Se crea un nuevo archivo índice que contiene todas las entradas del índice de los archivos fusionados
Se crea un nuevo snapshot que refleja el estado actual de los datos después de la compactación

Como resumen, se realiza una comparativa de las principales métricas de performance entre Copy on-Write y Merge on-Read:

COW MOR
Coste de escritura
Mayor
Menor
Latencia
Mayor
Menor
Rendimiento de consulta
Mayor
Menor antes de compactación
Igual tras compactación
  • Escritura: COW tiene un mayor costo de escritura que MOR debido a que cada vez que se realiza una operación de escritura (ya sea añadir un nuevo registro o actualizar uno existente), se crea un nuevo delta file y se deben actualizar los archivos de índice correspondientes. En cambio, en MOR, los registros se escriben directamente en el base file, lo que implica una menor cantidad de operaciones de escritura y, por lo tanto, un menor costo en términos de rendimiento y uso de recursos.
  • Latencia: COW tiene un menor data latency que MOR debido a que los registros nuevos o actualizados se escriben primero en un delta file separado, en lugar de actualizar directamente el base file como en MOR.
  • Tiempos de consulta: COW tiene un menor tiempo de consulta que MOR debido a que en COW, los datos actualizados se almacenan en los Delta Files y los datos originales se mantienen en el Base File. Esto significa que no es necesario realizar ninguna operación de lectura para obtener la versión actualizada de los datos.

Hudi no solo ofrece distintas formas de almacenamiento, sino también, distintas formas de realizar consultas sobre la información almacenada, dependiendo de nuevo tanto de los casos de negocio como del tipo de almacenamiento escogido:

  • Snapshots: consulta la última versión procedente de un commit o compactación. Gracias a este tipo de consultas, se pueden obtener las versiones de los datos en momentos específicos gracias a la combinación del base y delta file (time travel). Misma performance en CoW y MoR.
  • Read Optimized: únicamente disponible si el tipo de tabla en el que se almacenan los datos es MoR. Basado en la obtención de vistas optimizadas para lectura de un conjunto de datos grande y distribuido. Esto se consigue mediante indexación optimizada (Bloom Filter Index), lo que permite reducir considerablemente el tiempo de búsqueda de datos. Además se apoya también en la compactación de datos que hace que, de nuevo, las tareas de búsqueda sean menos costosas al disminuir el volumen de los mismos.
  • Incremental: Permite leer solo los datos actualizados o agregados desde la última consulta. Esto ayuda a reducir el tiempo de lectura y el uso del almacenamiento en disco.

Conclusiones

En este artículo se ha descrito como desplegar una aplicación que ingesta eventos en tiempo real y forma con la salida un LakeHouse con una arquitectura serverless. Con esto se ha buscado un nivel de abstracción intermedio de tal manera que sea una aplicación simple pero con la suficiente potencia para poder llegar a utilizarse en entornos productivos reales.

Desplegar aplicaciones basadas en la combinación de tecnologías como son Apache Flink y Hudi otorga la capacidad de procesar grandes volúmenes de datos en tiempo real y de manera escalable. Esto combinado con la garantía que aportan las transacciones ACID, hace que la combinación de Apache Flink y Apache Hudi sea una solución sólida para la ingesta y procesamiento de datos en entornos críticos.

A pesar de todas las ventajas que se han descrito cabe resaltar algunos inconvenientes que se han podido detectar desarrollando esta arquitectura. El mayor problema que se ha encontrado ha sido la resolución de dependencias entre las librerías de Flink y los conectores necesarios, como por ejemplo el de Hudi. La falta de comunidad que existe a día de hoy, aunque esta crecerá con el paso del tiempo, supuso un problema inicial considerable para poder formar el paquete final con todas las dependencias necesarias sin que hubiese conflictos entre sí. Además, cabe resaltar que se ha percibido menos comunidad para el lenguaje de Python que para el de Java o Scala. En este artículo se eligió Python ya que existía un conocimiento interno más fuerte pero en el caso de que el stack tecnológico se acerque más a lenguajes soportados por la JVM (Java Virtual Machine) sería aconsejable el uso de Scala o Java.

En los próximos artículos entraremos más en detalle en las particularidades que tienen tanto Hudi como Flink para poder personalizar y ajustar el comportamiento de esta aplicación dependiendo de las necesidades que presente nuestro caso de uso.

Referencias

[1] Repositorio Github Flink-Hudi (Terraform). [link]

[2] Greenlet 2.0.2. Documentation [link] (February 28, 2023)

[3] Amazon Kinesis Data Analytics Costs. [link] (March 23, 2022)

[4] Hudi Optimized Indexing. [link] (September 23, 2021)

[5] Hudi Writing Concurrency. [link] (September 23, 2021)

Autores

Alberto Jaen

AWS Cloud Engineer

Empecé mi carrera laboral con el desarrollo, mantenimiento y administración de bases de datos multidimensionales y Data Lakes. A partir de ahí comencé a estar interesado en plataformas de datos y arquitecturas cloud, estando certificado 3 veces en AWS y 2 con Hashicorp.

Actualmente me encuentro trabajando como un Cloud Engineer desarrollando Data Lakes y DataWarehouses con AWS para un cliente relacionado con la organización de eventos deportivos a nivel mundial.

Alfonso Jerez

AWS Cloud Engineer

Comencé mi carrera como Data Scientist en distintos sectores (banca, consultoría,…) enfocado en la automatización de procesos y desarrollo de modelos. En los últimos años aposté por Bluetab motivado por el interés en especializarme como Data Engineer y comenzar a trabajar con los principales proveedores Cloud (AWS, GPC y Azure) en clientes como Olympics, específicamente en la optimización del procesamiento y almacenamiento del dato.

Colaborando activamente con el grupo de Práctica Cloud en investigaciones y desarrollo de blogs de tecnologías punteras e innovadoras tales como esta, fomentando así el continuo aprendizaje.

Adrián Jiménez

AWS Cloud Engineer

Dedicado al aprendizaje constante de nuevas tecnologías y su aplicación, disfrutando de utilizarlas en la resolución de desafíos tecnológicos. Desarrollo mi carrera como Cloud Engineer diseñando, implementando y manteniendo infraestructura en AWS.

Colaboro activamente en la Práctica Cloud, donde investigamos y experimentamos con nuevas tecnologías, buscando soluciones para los retos que enfrentan nuestros clientes.

Navegación

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?
DESCUBRE BLUETAB

SOLUCIONES, SOMOS EXPERTOS

DATA STRATEGY
DATA FABRIC
AUGMENTED ANALYTICS

Te puede interesar

Análisis de vulnerabilidades en contenedores con trivy

noviembre 16, 2020
LEER MÁS

Creando una comunidad dataHub en tu organización

septiembre 15, 2022
LEER MÁS

Bluetab se certifica como AWS Well Architected Partner Program

octubre 19, 2020
LEER MÁS

Gobierno de Datos: ¿tendencia o necesidad?

octubre 13, 2022
LEER MÁS

Bluetab en la ElixirConfEU 2023

mayo 3, 2023
LEER MÁS

Detección de Fraude Bancario con aprendizaje automático II

septiembre 17, 2020
LEER MÁS

Publicado en: Blog, Practices, Tech

Snowflake: Zero-Copy clone, o cómo librarte del duplicado de datos al clonar.

marzo 22, 2023 by Bluetab

Snowflake: Zero-Copy clone, o cómo librarte del duplicado de datos al clonar.

Roberto García Parra

Technical Delivery Manager

Gabriel Gallardo Ruiz

Enterprise Architect

Introducción

Cómo continuación a la serie de artículos que estamos haciendo sobre las funcionalidades avanzadas que se derivan de la forma en la que se almacenan los datos en Snowflake, presentamos este nuevo artículo sobre el Zero-copy clone, que permite mediante diferentes operaciones a nivel metadato poder tener diferentes copias o versiones de la información, sin tener que duplicar datos en la mayoría de las ocasiones.

¿Qué es Zero-Copy Clone?

Uno de los casos de uso más frecuente que implica gran consumo de tiempo, recursos y almacenamiento, especialmente si hablamos de grandes dataset, es el copiado de datos. Para la realización de copias de objetos, snowflake ofrece zero-copy clone. Esta operación se realiza sobre la metadata, lo que permite realizar clonado de objetos rápidamente sin tener que duplicar los datos.

¿Cómo funciona?

Snowflake realmente lo que realiza es una copia de la metadata asociada al objeto que se va a clonar. Como podemos ver en el ejemplo de clonación de la tabla ‘Events’ en la siguiente imagen, simplemente duplica la metadata sin realizar ningún cambio en la parte de almacenamiento.

Una vez realizado el clon, los objetos clonados tienen su propio ciclo de vía, lo que permite que se puedan realizar cambios sobre los datos sin afectar al objeto original, de igual modo los cambios realizados sobre el objeto original tampoco serán reflejados sobre el objeto clonado.

Zero-copy clone permite la realización de clones prácticamente de cualquier objeto de Snowflake siendo especialmente útil en bases de datos, esquemas y tablas.

¿Qué coste tiene?

Al tratarse de una operación exclusiva de metadata, no se repercuten costes ni de procesamiento ni de almacenamiento, ni siquiera es necesario realizar la operación con un virtual data warehouse activo.

¿Cómo se puede clonar una tabla?

Privilegios: Para poder clonar una tabla, el ROLE que va a realizar la clonación tiene que tener privilegios de SELECT sobre la tabla que se va a clonar, además como es lógico, privilegios de CREATE TABLE sobre el esquema destino en el que se va a crear el clon de la tabla.

Sentencia: La sentencia utilizada para la clonación de tablas es similar a la de creación pero añadiendo la cláusula CLONE. A continuación, vamos a clonar la tabla “events»:

USE ROLE INGESTA_HUB_ROLE;
USE SCHEMA WEATHER.HISTORICAL;
CREATE TABLE EVENTS_CLONE CLONE EVENTS;

Podemos comprobar que la clonación de la tabla se realiza de inmediato, ya que como se comentó anteriormente únicamente se opera sobre la metadata.

Además, podemos observar en la siguiente tabla que todas las propiedades de la tabla origen se han clonado en la nueva tabla. Únicamente en el caso en que la tabla origen tenga asignado una cluster key, la nueva tabla se creará con automatic_clustering suspendido.

EVENTS EVENTS
cluster_by
LINEAR (COUNTRY,CITY)
LINEAR (COUNTRY,CITY)
rows
7,479,165
7,479,165
bytes
105,110,528
105,110,528
owner
INGESTA_HUB_ROLE
INGESTA_HUB_ROLE
retention_time
30
30
automatic_clustering
ON
OFF
change_tracking
OFF
OFF
search_optimization
OFF
OFF
is_external
N
N

Con respecto a los privilegios, por defecto no serán clonados. Esto lo podemos comprobar con las sentencias siguientes:

SHOW GRANTS ON TABLE WEATHER.HISTORICAL.EVENTS;

SHOW GRANTS ON TABLE WEATHER.HISTORICAL.EVENTS_CLONE;

Para que se clonen los privilegios asignados a la tabla origen, hay que añadir COPY GRANTS en la sentencia de clonado:

CREATE TABLE EVENTS_CLONE_1 CLONE EVENTS COPY GRANTS;

Ahora podemos comprobar que los privilegios han sido clonados:

SHOW GRANTS ON TABLE WEATHER.HISTORICAL.EVENTS_CLONE;

Clonación usando time travel

Snowflake permite realizar la clonación de una tabla para un momento histórico determinado, para ello tendremos que utilizar la cláusula AT o BEFORE en la sentencia de clonado.

Para la ejecución de la prueba, vamos a hacer cambios en la tabla de EVENTS y después realizaremos el clonado con un time travel anterior al cambio.

DELETE FROM EVENTS WHERE AIRPORTCODE=’KS47′;

Clonamos la tabla con un time travel anterior a la realización del borrado

CREATE TABLE EVENTS_CLONE_TIME_TRAVEL CLONE EVENTS at (offset => -60*5);

Si consultamos la información referente a ambas tablas, podemos comprobar que el clonado se ha realizado en el momento anterior en el que la tabla EVENTS tenía 9.062 filas más.

Consideraciones del clonado de tablas

  • Actualmente las tablas externas no pueden ser clonadas.
  • La tabla clonada tiene su propio ciclo de vida con lo que no tiene acceso a los datos históricos de la tabla origen utilizando time travel.
  • Una tabla clonada no incluye el historial de carga(LOAD_HISTORY) de la tabla de origen.
  • Si se clona una tabla con una secuencia asignada como valor por defecto a una columna, ésta seguirá referenciando a la secuencia original. En el caso de clonación de base de datos o esquemas que contengan tanto la secuencia como la tabla, la columna referenciará a la secuencia clonada (esto lo veremos con un ejemplo en la parte de clonado de Esquemas y Bases de dato)
  • Si clonamos una tabla que contiene a una foreign key, esta seguirá haciendo referencia a la tabla con al primary key. Como pasaba en el caso de las secuencias, si la clonación se realiza sobre un esquema o una base de datos y contiene ambas tablas, las referencias se realizan sobre las clonadas. En el caso de que la referencia de la foreign key sea sobre otra base de datos, seguirá realizándose sobre la tabla que contiene la primary key.

¿Cómo realizar la clonación de Esquemas y Base de datos?

Privilegios: Para poder clonar una base de datos o un esquema en Snowflake el role que va a realizar la operación tiene que tener permisos USAGE sobre los objetos que se van a clonar y los privilegios adecuados para la creación de los objetos en el destino.

La realización de clonado de un esquema o de una base de datos se realiza de manera recursiva, clonando todos los objetos hijos con la única excepción de las tablas externas , stages internos y snowpipes internos que no serán clonados.

A diferencia de la clonación de tablas, cuando se realiza la clonación de un esquema o una base de datos todos los permisos son heredados, por tanto, todos los objetos de la base de datos o del esquema clonado tendrán los mismos privilegios que tenían en el original.

Sentencia

USE ROLE ACCOUNTADMIN;
USE DATABASE WEATHER;
CREATE SCHEMA HISTORICAL_CLON CLONE HISTORICAL;

Al igual que sucedía con la clonación de tablas, la operación de clonado se realiza únicamente sobre la metadata, lo que permite que se realice en un tiempo reducido y sin necesidad de tener un virtual warehouse activo.

Para comprobar que la clonación se ha realizado de la forma esperada, podemos observar los objetos de cada una de los esquemas. Comprobamos tablas e internal stages.

SHOW TABLES IN HISTORICAL;

SHOW TABLES IN HISTORICAL_CLON;

Observamos que las tablas del esquema original y del clonado son iguales, además, se han heredado tanto owner como resto de propiedades. Como sucedía en el caso de la clonación de tablas, automatic_clustering está desactivado en las tablas del esquema clonado.

A continuación, vamos a comprobar que los internal stage del esquema original no se han clonado en el nuevo esquema

SHOW STAGES IN HISTORICAL;

SHOW STAGES IN HISTORICAL_CLON;

Clonación usando time travel

Como sucedía con el clonado de tablas, Snowflake también permite realizar el clonado de bases de datos y esquemas usando la opción de time travel.

En este caso vamos a realizar la clonación de la base de datos en un tiempo anterior a la clonación del esquema “HISTORICAL” que hemos realizado anteriormente.

CREATE DATABASE WEATHER_CLONE CLONE WEATHER at (offset => -60*60);

SHOW SCHEMAS IN WEATHER;

SHOW SCHEMAS IN WEATHER_CLONE;

Podemos comprobar en la base de datos clonada que no se encuentra el esquema que hemos clonado anteriormente.

Secuencias y foreign key:

 Como se comentó anteriormente en el clonado de tablas, si se clona un esquema que contiene una tabla con una columna con un valor por defecto de una secuencia o una foreign key y están en el mismo esquema o base de datos, la referencia de la secuencia apuntará a la misma referencia en el esquema o base de datos clonada.

Se ha añadido al esquema “HISTORICAL” una tabla “event_temperature” que contiene una secuencia y una foreign key a otra tabla. Se realiza la clonación:

CREATE SCHEMA HISTORICAL_CLON_2 CLONE HISTORICAL;

Si se observa la definición de la table, podemos comprobar cómo se ha cambiado la referencia tanto de la secuencia como de la foreign key.

Consideraciones del clonado de esquemas y bases de datos

  • Para el clonado de esquemas y bases de datos hay que tener en cuenta las mismas consideraciones observadas en la parte de las tablas.
  • Cuando se clona una base de datos o un esquema que contiene tareas, las tareas del clon se suspenden de forma predeterminada.
  • La clonación es rápida, pero no instantánea, especialmente para objetos grandes. Por tanto, si se ejecutan comandos de DDL en los objetos de origen mientras la operación de clonación está en curso, es posible que los cambios no sean reflejados en objeto clonado.

Conclusiones

Como vimos también en los artículos anteriores, Snowflake nos ofrece muchas características avanzadas, es muy importante comprender el funcionamiento de cada una de ellas para poder sacar el máximo partido siendo este el objetivo principal de esta serie de artículos. En este caso, comprender correctamente el clonado de datos nos va a ayudar a poder utilizar esta característica de manera correcta cuando sea necesario como puede ser en la creación de entornos de prueba o en la realización de snapshot.

Finalmente, hay que destacar que Snowflake nos ofrece un potente mecanismo de clonado de objetos, permitiéndonos la clonación de una forma sencilla, apenas incurriendo en costes y sin duplicación de datos. Estas características pueden ser muy importantes cuando vayamos a seleccionar un datawarehouse para nuestro entorno analitico.

Navegación

Introducción

¿Qué es Zero-Copy Clone?

¿Cómo funciona?

¿Qué coste tiene?

¿Cómo se puede clonar una tabla?

Clonación usando time travel

Consideraciones del clonado de tablas

¿Cómo realizar la clonación de Esquemas y Base de datos?

Clonación usando time travel

Consideraciones del clonado de esquemas y bases de datos

Conclusiones

Autores

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?
DESCUBRE BLUETAB

Roberto García Parra

Technical Delivery Manager

Gabriel Gallardo Ruiz

Enterprise Architect

SOLUCIONES, SOMOS EXPERTOS

DATA STRATEGY
DATA FABRIC
AUGMENTED ANALYTICS

Te puede interesar

Bluetab se incorporará a IBM

julio 9, 2021
LEER MÁS

Tenemos Plan B

septiembre 17, 2020
LEER MÁS

Databricks sobre AWS – Una perspectiva de arquitectura (parte 2)

enero 27, 2022
LEER MÁS

Data Mesh

julio 27, 2022
LEER MÁS

MODELOS DE ENTREGA DE SERVICIOS EN LA NUBE

junio 27, 2022
LEER MÁS

Detección de Fraude Bancario con aprendizaje automático

septiembre 17, 2020
LEER MÁS

Publicado en: Blog, interes, Practices, Tech

Snowflake, el Time Travel sin DeLorean para unos datos Fail-Safe.

febrero 23, 2023 by Bluetab

Snowflake, el Time Travel sin DeLorean para unos datos Fail-Safe.

Roberto García Parra

Technical Delivery Manager

Gabriel Gallardo Ruiz

Senior Data Architect

Introducción a Snowflake

Este artículo supone una continuación del artículo inicial que hicimos sobre el almacenamiento en Snowflake, y será el primero de una serie donde entraremos a fondo en las características más diferenciadoras de Snowflake. El primer artículo se puede consultar aquí.

Recordar que una de las características principales del almacenamiento en Snowflake es la inmutabilidad de los archivos: Cuando hay una operación DML sobre una tabla, los ficheros donde están los datos nunca se modifican, sino que se van creando nuevas versiones de los mismos, archivando todas las versiones anteriores por las que han ido pasando los ficheros durante el tiempo de retención establecido en el parámetro DATA_RETENTION_TIME_IN_DAYS parámetro que se puede establecer a nivel base de datos, esquema o tabla.

Este archivado es lo que posibilita las dos funcionalidades avanzadas de Snowflake que se van a ver en este artículo: El Time Travel y el Fail-Safe.

¿Qué es el Time Travel?

El Time Travel es una funcionalidad que permite acceder a versiones históricas por las que han ido pasando los datos en las tablas. Por ejemplo, si tenemos un proceso de carga diaria de una tabla de movimientos contables, podríamos lanzar una consulta de cuál era el estado de los movimientos contables tres días atrás.

¿Qué es el Fail-Safe?

Es un periodo adicional de siete días por el que Snowflake almacena las versiones de los datos para una posible recuperación. Este periodo no es configurable, siempre es de siete días, y únicamente aplica en un tipo de tablas: Las permanentes. 

Los objetos con Fail-Safe son las bases de datos, esquemas y tablas.

¿Qué se puede hacer con el Time Travel?

  • Consultar una foto estática de cualquier momento del pasado hasta un máximo de 90 días. Por ejemplo, de una tabla de movimientos contables, podríamos sacar un balance con los movimientos congelados a una fecha.
  • Recuperar tablas que se hayan borrado accidentalmente de forma muy sencilla mediante un simple comando SQL (UNDROP).
  • Recovery point-in-time: Recuperar datos en un punto concreto, dentro del plazo de los 90 días máximo del time travel.
  • Poder sacar snapshots de los datos para guardarlos permanentemente → Para esto podríamos combinar dos funcionalidades: El time travel y el zero-copy cloning, que veremos más adelante.

¿Cómo utilizar el Fail-Safe?

El Fail-Safe permite recuperar datos hasta siete días máximo después de la expiración del Time Travel. Esta recuperación solamente puede ser hecha a través del equipo de soporte de Snowflake, a diferencia del Time Travel, y se debe hacer vía petición. El Fail-Safe es un mecanismo para poder recuperar datos en caso de emergencia, no está pensado para hacer queries históricas, etc. para eso hay que usar el Time Travel.

No hay un SLA asociado a la recuperación de datos en Fail-Safe: Snoflake habla de horas incluso días para recuperar estos datos.

¿Cómo se configura el Time Travel?

Es un servicio que nos proporciona Snowflake y no hay que hacer nada adicional, más allá de configurar el número de días que queremos que nuestros objetos lo tengan activo. Hay que tener en cuenta lo siguiente:

  • Dependiendo de la edición que tengamos contratada de Snowflake, el número de días permitido de Time Travel puede diferir. A día de hoy, en la edición Standard solamente se puede habilitar hasta un día de Time Travel, mientras que a partir de la edición Enterprise podemos habilitar hasta 90 días de Time Travel.
  • El Time Travel de hasta 90 días solamente está habilitado en las tablas permanentes. Resto de tablas, un día máximo de Time Travel. Si quieres saber más sobre los tipos de tablas, hablamos sobre ellas en nuestro anterior artículo sobre almacenamiento, en la sección DML’s en Snowflake. El parámetro que configura el número de días de Time Travel en las tablas es el DATA_RETENTION_TIME_IN_DAYS. Este valor está por defecto a 1, pero podemos especificar un valor distinto a nivel base de datos o esquema, para que todos los objetos por debajo hereden dicho valor. También es posible configurar un tiempo mínimo de retención a nivel de cuenta, mediante el parámetro MIN_DATA_RETENTION_TIME_IN_DAYS. Este parámetro solamente es configurable por el rol ACCOUNTADMIN, y en caso de tener un valor, el tiempo de retención de una tabla sería el máximo del valor MIN_DATA_RETENTION_TIME_IN_DAYS a nivel cuenta y el DATA_RETENTION_TIME_IN_DAYS de la propia tabla.
  • Si queremos deshabilitar el TIME TRAVEL, simplemente tenemos que establecer un valor cero al parámetro DATA_RETENTION_TIME_IN_DAYS.

¿Cómo se configura el Fail-Safe?

El Fail-Safe no es configurable. Es un periodo fijo de siete días que se activa automáticamente en tablas permanentes sin necesidad de intervención alguna por parte del usuario, una vez que finaliza el periodo de Time Travel, o si se reduce este periodo, y hay datos con antigüedad superior al nuevo periodo definido, los cuales pasarían también automáticamente a Fail-Safe.

Consideraciones a tener en cuenta en el Time Travel y el Fail-Safe

¿Es posible modificar el Time Travel de un objeto?

Sí, es posible, pero hay que tener en cuenta el impacto que tiene dicha modificación:

  • Si se incrementa, la extensión solamente afecta a datos que estén archivados en ese momento, no así a datos que ya hayan pasado a Fail-Safe. Imaginemos que tenemos una tabla con un Time-Travel de 5 días y la modificamos a 10 días, los datos dentro de los 5 días sí se les extendería su periodo a 10, pero los datos con una antigüedad mayor a 5 días que hayan pasado al Fail-Safe, seguirían en el Fail-Safe, incluso si solo ha pasado por ejemplo un día desde que están en el Fail-Safe.
  • Si se disminuye, solamente los datos dentro del nuevo periodo de Time Travel permanecen ahí, mientras que el resto pasa a Fail-Safe. Si reducimos por ejemplo de 20 días a dos días, solamente se mantendrán los datos que se hayan generado en estos últimos dos días, mientras que los datos con antigüedad mayor o igual a 3 días pasan a Fail-Safe.

La modificación del Time Travel de un objeto se hace mediante una sentencia ALTER TABLE, modificando el parámetro DATA_RETENTION_TIME_IN_DAYS al nuevo tiempo en días deseado.

¿Qué pasa cuando el periodo de retención de un contenedor y un objeto chocan y el contenedor es borrado?

El contenedor se refiere a un objeto Snowflake que a su vez contiene 1..n objetos. Dos claros ejemplos son una base de datos, que a su vez contiene 1..n esquemas, y un esquema que a su vez contiene 1..n objetos de esquema tales como tablas, vistas o procedimientos almacenados entre otros.

Cuando una base de datos o esquema tiene definido un periodo de retención, y los objetos hijos tienen definidos un periodo de retención propio, cuando se borra el contenedor padre todo lo que esté contenido se retiene por el periodo definido en el padre, incluso si algunos de los objetos hijo tiene su propio periodo de retención y es diferente al del padre.

Esto quiere decir que si tenemos una base de datos con un periodo de retención de 5 días, y uno de los esquemas contenidos tiene definido un periodo de 10 días, si hay un borrado de la base de datos solamente tendríamos 5 días para recuperar no solo la base de datos sino también cualquiera de los esquemas. Esto aplica también a cuando tenemos un periodo de retención a nivel de objetos, y borramos el esquema que los contiene. En ese caso, el periodo de retención que cuenta siempre es el del esquema.

Si se desea mantener un periodo de retención diferente para alguno de los hijos, estos deben ser borrados previamente a la eliminación del contenedor. Por ejemplo, se borran primero las tablas en las que quiero mantener su periodo propio de retención, y posteriormente se borra el esquema.

Costes del Time Travel y el Fail-Safe

El Time Travel y el Fail Safe aumentan nuestra factura de almacenamiento. Todas las versiones históricas que se vayan archivando de nuestros datos, ocupan un almacenamiento que tendremos que pagar, aunque hay que tener en cuenta que Snowflake, cómo vimos en el artículo de almacenamiento, gestiona esto de la manera más eficiente posible, con lo que si por ejemplo, modificamos datos que afectan a una única micropartición, solo esta micropartición es archivada, pero no archivaría microparticiones no afectadas por la modificación.

Hay que tener cuidado en los siguientes supuestos, que sobre todo en tablas de alto volumen, pueden incrementar considerablemente los costes:

  • Truncados-borrados e inserciones continuos en tablas de alto volumen. Imaginemos que tenemos una tabla de varios gigas, que continuamente borramos y volvemos a cargar. En estos casos, cada vez que hiciéramos esa operación de borrado-inserción, estaríamos archivando varios gigas de tabla, y eso si se multiplica varias veces por el número de días, puede ser importante en la factura.
  • Actualizaciones masivas de datos con frecuencia. Imaginemos que tenemos un proceso que actualiza una columna después de cada inserción. Esto también generaría el archivado de toda la tabla entera.
  • Drops de tablas. Por el mismo motivo que un truncate, esto genera que se archive la tabla completa. Si hacemos continuos drops y recreaciones de la tabla con datos nuevos, una tabla permanente puede disparar los costes de almacenamiento.

Se recomienda para controlar los costes derivados del Time Travel y el Fail-Safe lo siguiente:

  • Si tenemos tablas que son fácilmente reproducibles desde fuera de Snowflake, mejor utilizar tablas transitorias que permanentes. De esta manera, nos ahorraremos los siete días de Fail-Safe y como máximo tendremos un día de Time Travel. Por ejemplo, tablas de lookup, o tablas de apoyo-staging para ciertos procesos ETL’s que no son esenciales. En este último caso, si no es necesario que la tabla persista más allá de la vida de la sesión, se puede configurar incluso como tabla temporal y ahorrar más, ya que en cuanto termina la sesión la tabla desaparece y no se puede recuperar.
  • Las tablas de hechos normalmente deberían ser tablas permanentes, pero si de igual manera las podemos recuperar fácilmente desde el sistema origen en caso de desastre, nos podemos plantear generar algunas como transitorias, y sacar backups periódicos con zero-copy cloning, característica que también se desarrollará en este artículo.

¿Cómo utilizar el Time Travel? Casos de uso prácticos

En nuestro ejemplo, tenemos una tabla donde se carga un stock diario. Lo que hemos hecho, ha sido el día 10 de noviembre cargar el stock de esa fecha, y el día 11 de noviembre hemos machacado el stock del 10 de noviembre por el actual a 11 de noviembre. Fijamos un Time Travel de treinta días a nivel base de datos (que es el que aplicaría por defecto a los objetos por debajo). Pasan 19 días desde la última carga.

Casos de uso que se plantean:

  • Un usuario quiere recuperar mediante una consulta la foto del 10 de noviembre.
  • Por error, uno de nuestros analistas borró la tabla. Es necesario recuperar el stock que teníamos de producto lo más rápido posible.
  • Un usuario nos pide que guardemos una foto del estado del stock a 10 de noviembre, por si nos lo piden en alguna auditoría.
  • Un analista necesita actualizar el stock de un producto concreto en el día 11 de noviembre, pero se equivoca y actualiza todos los productos. Restaurar la tabla al punto de antes del error.

Partimos ya de un stage interno creado en Snowflake donde hemos volcado los ficheros del 10 y el 11 de noviembre, y lanzamos el COPY INTO para insertarlos en la tabla cada día.

Primer caso de uso: Consulta de un estado anterior de la tabla

Si hacemos una consulta sobre la tabla, lo que obtenemos es el stock a día 11 de noviembre:

Para el usuario poder consultar la información a 10 de noviembre en esta tabla, tendría tres opciones:

  • Consulta con un timestamp fijo. Es decir, consultamos la tabla tal cual estaba en un momento específico del tiempo. En nuestro caso, la consultamos a 10 de noviembre:
  • Mediante un offset en segundos. Aquí lo que hacemos es decir que queremos consultar la información al estado de hace 19 días (cuando hacemos la consulta es 29 de noviembre, y queremos los datos del 10 de noviembre). Para ir 19 días hacia atrás, como el offset es en segundos, multiplicamos 60*60*24 (con esto pasamos los segundos a días) y por 19 (que son los días que queremos viajar hacia atrás):
  • Con un ID de query. Ojo con esta opción porque también puede dar problemas. En nuestro caso, cuando la ejecutamos, da el siguiente error:

Nos cercioramos de que ese ID de query sí que existe en el historial completo (Base de datos SNOWFLAKE, esquema ACCOUNT_USAGE, tabla QUERY_HISTORY:

Vemos que el ID es correcto y es justo cuando hicimos el truncate de la tabla para borrar los datos del día 10. El motivo por el que creemos que viene el error es porque el detalle del historial de queries solamente se guarda durante 14 días, con lo cual, este método no es recomendable para lanzar consultas pasado este periodo. Aunque nuestro Time Travel sea mayor (como en este caso, 30 días) el detalle de datos de la query no es accesible.

Segundo caso de uso: Recuperación de una tabla borrada por error

Imaginemos que algún usuario de manera accidental borra del todo la tabla:

drop table stock_diario

Los usuarios empiezan a quejarse que hay aplicaciones que han dejado de funcionar, tardaríamos bastante tiempo en reprocesar el archivo en origen, dependemos de un equipo que nos lo haga…

Snowflake facilita la recuperación de una tabla borrada durante el tiempo del Time Travel con una simple instrucción. Undrop la cual al ser una operación de metadata se ejecuta inmediatamente. No es necesario tener que localizar un backup donde estaba esa tabla ok, restaurarlo, sacar la tabla… simplemente ejecutar esta sentencia.

Demostración a continuación, borramos la tabla:

Ejecutamos una query y nos devuelve el siguiente error:

Ejecutamos la sentencia undrop:

Y vemos que Snowflake nos devuelve el mensaje de que la tabla ha sido correctamente restaurada.

Y comprobamos que podemos volver a hacer queries. Por supuesto, el Time Travel después de la recuperación se mantiene, pudiendo también consultar fotos anteriores de la tabla tal y como vemos en la captura:

Importante a tener en cuenta: El UNDROP siempre restaura la última versión de los datos que hubiese en el momento del borrado.

Tercer caso de uso: Sacar una foto estática de un estado de la tabla

Ya se ha visto que durante el periodo de Time Travel podemos consultar el estado anterior de una tabla. Pero, ¿y si un usuario pidiera guardar el estado de esa tabla de forma permanente? Este caso de uso es frecuente en el mundo financiero y de la auditoría para cosas tales como poder sacar un estado de cuentas con los movimientos a una determinada fecha, o que un regulador nos pida sacar instantáneas de los datos a determinados momentos para una consulta posterior.

La opción más inmediata para satisfacer este requerimiento sería combinar las funcionalidades de zero-copy cloning y time travel. Las ventajas que nos ofrece esta opción sería:

  • No duplicamos almacenamiento por la instantánea. Durante el tiempo de Time Travel, tenemos un único fichero, y nuestro clon apuntaría a esa versión de los datos. Cuando el Time Travel expire, Snowflake sabrá que hay un clon apuntando a esos datos y por tanto no los borrará. Si lo hiciésemos insertando los datos en una nueva tabla, durante el Time Travel de esa versión de los datos se estaría duplicando el almacenamiento.
  • Creamos todo en una simple sentencia.

A continuación se muestra el clonado de nuestra tabla de stock con la foto del 10 de noviembre:

Imaginemos que pasa el time travel de esta tabla. Podemos simularlo haciendo un ALTER TABLE y poniendo la tabla a 10 días (han pasado más de 10 días desde la última modificación):

Si se intenta sacar la foto a 10 de Noviembre desde la tabla original, Snowflake devuelve el siguiente error:

Ya que ese estado de los datos tenían una antigüedad mayor a 10 días, Snowflake lo ha llevado directamente a Fail-Safe.

Si consultamos el clon que se acaba de generar:

Se ve que a pesar de que el Time Travel ha expirado, mantenemos la foto del 10 de noviembre, y esta foto persistirá salvo que borremos el clon.

Cuarto caso de uso: Restaurar la tabla a un estado anterior

Imaginemos que le piden a un usuario actualizar el stock de impresoras de 15 a 14 unidades. Para ello el usuario genera la siguiente consulta:

</