Spark — 7ma nota: Manejo de archivos JSON
JSON o JavaScript Object Notation es un formato de texto muy popular usado dentro del mundo del desarrollo de software, para el intercambiar grandes volúmenes de datos. Esta notación de texto, se creó tomando como base un sub-lenguaje de JavaScript. Sin embargo, luego de ser muy bien acogida por los desarrolladores de software como una alternativa al uso de XML, fue tomando una creciente popularidad en la comunidad de desarrolladores. Es por este motivo que, a partir del 2009 se considera como un formato de texto independiente al lenguaje JavaScript. En este artículo comentaré algunas de las formas que encontré para manejar este tipo archivos dentro de Spark.
Temas:
- Uso de la librería JSON
- Manejando archivos JSON en Spark
- Uso de pyspark sql para manipular archivos JSON
USO DE LA LIBRERÍA JSON
En la actualidad, en Python es posible usar la biblioteca json que contiene varios métodos con los que es posible manipular un archivo JSON en su totalidad. En el siguiente ejemplo, usaré la página jsonplaceholder que es un repositorio de datos en formato JSON. Está página ofrece datos con distintas estructuras para que sean usados como data de prueba. Para este ejemplo, trabajaré con el conjunto de datos llamado “todos”. A continuación, muestro algunos de los elementos que envía el servicio.
Como se logra apreciar en el bloque de código anterior, el formato de estos datos no es muy complejo ya que, cada elemento cuenta con 4 atributos: userId, id, title y completed. Entonces, para recibir esta información dentro de mi código debo usar también la biblioteca request. A continuación, muestro unas cuantas líneas de código en donde invoco al web service usando Python.
>>> import json>>> import requests>>> response = requests.get("https://jsonplaceholder.typicode.com/todos")>>> ptodos = json.loads(response.text)
Luego de esto, uso el método loads() de la biblioteca json para codificar los datos que son enviados por el web service. A continuación, muestro el tipo de dato de la variable ptodos.
>>> type(ptodos)<class 'list'>
Esta variable es de tipo list por lo que, para acceder a cada elemento es posible iterar sobre ella, como se muestra a continuación.
>>> for todo in ptodos:... print(todo){'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}{'userId': 1, 'id': 2, 'title': 'quis ut nam facilis et officia qui', 'completed': False}{'userId': 1, 'id': 3, 'title': 'fugiat veniam minus', 'completed': False}{'userId': 1, 'id': 4, 'title': 'et porro tempora', 'completed': True}...
En este punto, tengo los datos del web service en memoria listos para que sean usados. Ahora, ¿Cómo los uso dentro de Spark? .
MANEJANDO ARCHIVOS PYTHON EN SPARK
Una vez que, los datos se encuentran dentro de la variable ptodos los cargaré a un RDD usando el método parallelize(). Debemos recordar que, este método recibe una colección de datos y los carga a un objeto de tipo RDD. A continuación, muestro el código que usé para cargar los datos.
>>> RDD = sc.parallelize(ptodos)
Luego de esto, es posible aplicar las transformaciones que Spark pone a disposición de los desarrolladores. Por ejemplo recordemos que, dentro de los atributos de cada elemento existe un campo llamado “completed” que es de tipo booleano. Entonces, para el siguiente ejemplo voy a filtrar todos los elementos que tengan “true” en este atributo. Para lograr esto usaré la transformación filter() y map() como se muestra a continuación.
>>> ptodosTrue = RDD.filter(lambda todo: todo["completed"]).map(lambda js: json.dumps(js))
Al aplicar la acción collect() sobre la nueva variable ptodostrue podremos ver los datos que contiene.
>>> ptodosTrue.collect()['{"userId": 1, "id": 4, "title": "et porro tempora", "completed": true}', '{"userId": 1, "id": 8, "title": "quo adipisci enim quam ut ab", "completed": true}', '{"userId": 1, "id": 10, "title": "illo est ratione doloremque quia maiores aut", "completed": true}', '{"userId": 1, "id": 11, "title": "vero rerum temporibus dolor", "completed": true}', '{"userId": 1, "id": 12, "title": "ipsa repellendus fugit nisi", "completed": true}'
...
Entonces, ahora todos los datos que contiene la variable son los registros que tienen “true” en el atributo “completed”. Ahora, si quisiera guardar los datos de este RDD en un archivo simplemente, tendría que usar el método saveAsTextFile() como se muestra a continuación.
>>> ptodosTrue.saveAsTextFile("/susejzepol/CasosEjemplo/Spark/caso3-Archivojson/ptodostrueA")
Dentro de la carpeta tenemos varios files con los datos de la variable distribuidos dentro de ellos.
NOTA: Es importante resaltar que, antes de guardar los datos en el archivo, estos fueron decodificados en la transformación anterior usando el método dumps() dentro de la transformación map(). ¿No se dieron cuenta?, pueden dar una segunda mirada a la transformación, está vez, fíjense en la segunda sección aquí es donde pasa la magia.
Esto es importante debido a que, si los datos no son decodificados antes de ser guardados no estarán en formato JSON. Si no sabes que es codificar o decodificar datos en formato JSON la página RealPython lo explica mejor en esta frase:
Think of it like this: encoding is for writing data to disk, while decoding is for reading data into memory.
USO DE PYSPARK SQL PARA MANIPULAR ARCHIVOS JSON
Ahora que ya sabemos como manipular archivos JSON con Python y Spark. Podemos ir un paso adelante y probar el manejo de archivos JSON con el modulo pyspark sql de Spark. Este modulo nos permite trabajar con archivos en formato JSON sin ningún problema incluso, me atrevería a decir que es más fácil en comparación con el método anterior. Entonces, ¿no sabes que es pyspark sql?. No te preocupes, el concepto es simple, básicamente nos ayuda a trabajar con datos estructurados sin importar el origen de los datos. En efecto, pyspark sql trabaja con tablas de datos llamadas data frames. Por el momento, solo usaremos algunos de los métodos que nos permiten cargar y manipular archivos JSON, pero si necesitan más información del tema pueden revisar el siguiente enlace. Como primer paso debo obtener una instancia del modulo dentro de mi código, para esto usaré la siguiente línea de código.
>>> SqlCtx = SQLContext(sc)
Ahora, aprovecharé los archivos que fueron generados en el ejemplo anterior, recordemos que estos se encuentran distribuidos en varios archivos dentro a una carpeta llamada “ptodostrueA”. Entonces, cargar los datos dentro de un data frame es muy sencillo, solo necesitamos indicar la ruta de la carpeta que contiene todos los archivos. Para codificar los archivos usaré el método read.json de Pyspark Sql.
>>> ptodosSql = SqlCtx.read.json("/susejzepol/CasosEjemplo/Spark/caso3-Archivojson/ptodostrueA/")>>> type(ptodosSql)<class 'pyspark.sql.dataframe.DataFrame'>
Vemos que la variable ptodosSql es un data frame, para mostrar los datos que contiene podemos usar la acción collect() al igual que con los objetos de tipo RDD o podemos también, usar el método show().
>>> ptodosSql.show()+---------+---+--------------------+------+|completed| id| title|userId|+---------+---+--------------------+------+| true|151|accusamus adipisc...| 8|| true|154|rerum non ex sapi...| 8|| true|155|voluptatem nobis ...| 8|| true|156|nam quia quia nul...| 8|| true|157|dolorem veniam qu...| 8|| true|158|debitis vitae del...| 8|...+---------+---+--------------------+------+only showing top 20 rows
Como vemos el interprete de Python nos muestra ahora los datos que contiene en memoria dicha variable. Luego, filtraré algunos de los datos de este data frame. Para lograr esto, es posible usar el método where() o filter() ambos se comportan de la misma forma filtrando datos que cumplan una condición específica. Por ejemplo, usaré el método filter() & where() para seleccionar los registros con el “userId” igual nueve.
>>> ptodosSql.filter(ptodosSql.userId == 9).show()+---------+---+--------------------+------+|completed| id| title|userId|+---------+---+--------------------+------+| true|161|ex hic consequunt...| 9|| true|162|omnis laboriosam ...| 9|| true|169|ea odio perferend...| 9|| true|171|fugiat aut volupt...| 9|| true|175|laudantium eius o...| 9|| true|178|nesciunt itaque c...| 9|| true|179|omnis consequuntu...| 9|| true|180|debitis nisi et d...| 9|+---------+---+--------------------+------+
Ahora con where().
>>> ptodosSql.where(ptodosSql.userId == 9).show()+---------+---+--------------------+------+|completed| id| title|userId|+---------+---+--------------------+------+| true|161|ex hic consequunt...| 9|| true|162|omnis laboriosam ...| 9|| true|169|ea odio perferend...| 9|| true|171|fugiat aut volupt...| 9|| true|175|laudantium eius o...| 9|| true|178|nesciunt itaque c...| 9|| true|179|omnis consequuntu...| 9|| true|180|debitis nisi et d...| 9|+---------+---+--------------------+------+
Ahora que sabemos como aplicar un filtro, voy a registrar esos datos en un nuevo data frame llamado ptodos9. En este caso, para guardar los datos también usaré el métodos saveAsTextFile() como se muestra a continuación.
ptodos9.toJSON().saveAsTextFile("/susejzepol/CasosEjemplo/Spark/caso3-Archivojson/ptodosnueve")
Entonces, luego de aplicar la transformación tengo como resultado una carpeta con los datos distribuidos en varios files.
NOTA: Algo importante a resaltar en la transformación anterior es que, se usa el método toJSON() para decodificar los datos antes de guardarlos en el file. Esto es importante debido a que, al igual que en el ejemplo anterior si no se realiza una codificación los datos no tendrán un formato válido.
CONCLUSIONES
De estas dos formas podemos manipular archivos JSON dentro de Spark para así, aplicar transformaciones sobre los datos y así, explotarlos de distintas formas para obtener resultados asombrosos. Espero que, esta pequeña entrada pueda ayudar a alguien que está intentando iniciar en el mundo de Spark. En una próxima entrada abordaré una introducción al modulo Pyspark sql. En mi cuenta de github pueden encontrar el script.
Sobre el autor:
Jesus López Mesía es Ingeniero de sistemas y consultor BI. Puedes encontrar más artículos suyos en la revista MEDIUM, buscándolo como Jesus Lopez o en Twitter como @susejzepol6.