Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command for spark-submit #3

Open
stephenllh opened this issue Apr 21, 2022 · 27 comments
Open

Command for spark-submit #3

stephenllh opened this issue Apr 21, 2022 · 27 comments

Comments

@stephenllh
Copy link

I have a question regarding the command for spark-submit:

spark-submit \
    --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
    stream_all_events.py

What is the meaning of the packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2?

@ankurchavda
Copy link
Owner

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying

It is a required dependency for Kafka and Spark Streaming to work.

@stephenllh
Copy link
Author

I see! Can you explain what the colon symbol (:) means?

@ankurchavda
Copy link
Owner

I guess that is just for separation. It would mean that we want the spark-sql-kafka-0-10_2.12 package for the 3.1.2 version of Spark. So let's say you are running Spark 3.1.1. Then the last value after : will change to 3.1.1.

@stephenllh
Copy link
Author

Thanks a lot! I actually have another unrelated question. I have been struggling to find out how EventSim and Kafka work together to create the 4 topics: listen_events, page_view_events, auth_events, status_change_events. Is that hard-coded and only works for the Million Song Dataset? I also saw that each event has different columns, so you assign different Spark schemas for them.

@ankurchavda
Copy link
Owner

Right. Eventsim is currently coded to generate events for a music streaming website. You can change that but that is written in Scala. So if you have experience with that, why not. The topics are also currently hardcoded. You can check the codebase here

@stephenllh
Copy link
Author

I see. How did you check the columns and the data types of each Kafka event before you define the schemas?

@ankurchavda
Copy link
Owner

From the confluent control center UI. It should be available on port 9021 of your VM. You will have to forward that port to your local machine and open it in the browser.

@stephenllh
Copy link
Author

I didn't know you can check those out from the control center. Thanks for being helpful.

@ankurchavda
Copy link
Owner

No worries. If you check the youtube video I put out a few days back, you should get an idea of how to navigate through the project. You'll find the link in the README

@stephenllh
Copy link
Author

Hi. Do you mind answering another question of mine? I hope this is not bothering you.

When I set the environmental variable KAFKA_ADDRESS to be the external IP, the eventsim somehow couldn't create all 4 topics. The logs suggested that it struggles to find the correct IP address. But when I don't set the environmental variable (and thus setting it to localhost by default as per your code), then it works. Do you have any idea why?

@ankurchavda
Copy link
Owner

Hey, no worries, happy to answer your questions.

I am not sure why this would happen. Eventsim is running on the same VM as Kafka, hence it should pick up directly via that. Eventsim doesn't refer to the KAFKA_ADDRESS variable as such. Have you mentioned the correct IP, and do you have your 9092 port open?

@ankurchavda
Copy link
Owner

Your Kafka broker will be available at that port for other applications to connect.

@stephenllh
Copy link
Author

My mistake. It was opened automatically. (Sorry I deleted the comment after I realized that). Now it works. But let me try again after setting the env variable.

@stephenllh
Copy link
Author

Yeah it doesn't work if I set the KAFKA_ADDRESS to the external IP of the VM. It works when I unset the env variable.

This is what I see in the terminal:

06:42:20.083 [kafka-producer-network-thread | producer-3] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-3] Initiating connection to node 34.124.249.114:9092 (id: 1 rack: null) using address /34.124.249.114
06:43:20.086 [main] DEBUG o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-3] Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic page_view_events not present in metadata after 60000 ms.

@stephenllh
Copy link
Author

By the way I just saw this in the Kafka docker compose yaml file

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://${KAFKA_ADDRESS:-localhost}:9092

So the KAFKA_ADDRESS env variable does affect stuff.

@ankurchavda
Copy link
Owner

Yes, it does. If you want to connect from an external VM, like the Spark cluster in our case, you need to write to the external IP address instead of the localhost. Hence I added the KAFKA_ADDRESS variable in the docker-compose.yml.

@stephenllh
Copy link
Author

stephenllh commented Apr 26, 2022

I just noticed this in the terraform config:

resource "google_compute_firewall" "port_rules" {
  project     = var.project
  name        = "kafka-broker-port"
  network     = var.network
  description = "Opens port 9092 in the Kafka VM for Spark cluster to connect"

  allow {
    protocol = "tcp"
    ports    = ["9092"]
  }

  source_ranges = ["0.0.0.0/0"]
  target_tags   = ["kafka"]

}

I didn't follow everything in your project so I missed this part. Is this the reason why it doesn't work? But without this step, it does open up port 9092 on my VSCode when I SSH to the Kafka VM though. Or they are totally different things?

@stephenllh
Copy link
Author

Hey, when I set the port rules, it worked! It turns out this step is important.

@ankurchavda
Copy link
Owner

So on VS Code, you are forwarding the port. It is not the same as opening the port. You are able to forward the port to your local computer because you already authenticated when you did an SSH. On the other hand, you open your port to allow connections from some other VM in the network. By default, no ports are accessible to any other VMs inside or outside the network. You need to specify, which port would you like to open to accept connections.

@stephenllh
Copy link
Author

Ah I see! This explanation is helpful.

@stephenllh
Copy link
Author

I have another question. I see that you set up a Dataprocs server for Spark streaming. Do you think it makes sense if I do everything on a single VM and hence serve it locally? So I was thinking everything would involve "localhost" and the master node would be "local[*]" etc.

@ankurchavda
Copy link
Owner

You can. However, In the real-world Spark would always be running on some kind of a cluster. So to make things run on the cluster would be a wiser choice if you plan to showcase the infra choices in an interview or somewhere else.

@stephenllh
Copy link
Author

I see. Yeah you made a great point.

@stephenllh
Copy link
Author

stephenllh commented May 3, 2022

By the way, I am not sure this is a mistake on my behalf, but I think I might have found some potential bugs:

  1. The KAFKA_ADDRESS for the Kafka docker-compose YAML file is unnecessary since both Kafka and the EventSim container are in the same virtual machine. Kindly refer to this link.

The relevant change I made is the removal of the environmental variable

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092

Otherwise, I couldn't make it work. (Ignore my reply a few days ago, I too thought it worked when I set the env variable to the external IP of the VM, but it actually did not work. My bad.)

  1. For this command,
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 stream_all_events.py

I think there is a mistake in the Spark version. In the Spark README, the version 3.0.3 is installed. When I changed it to:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3 stream_all_events.py

it finally worked.

@ankurchavda
Copy link
Owner

Hey, apologies for the late reply. There's a lot going on at this point and I kinda missed this comment.

  1. I had to start writing the message to external IP instead of localhost explicitly so that my Spark Streaming process can connect and stream the messages. Earlier I had that same line as yours in the YAML file. I had to then add the ENV variable for this work. So I am not sure why it's the opposite in your case.
  2. Yes, so I used the Dataproc cluster, which had Spark 3.1.2 installed as default. Hence, my command was for the same.

@stephenllh
Copy link
Author

No worries. But I wonder if it's fine if we talk somewhere other than here? I also miss comments here sometimes. Perhaps Slack is good, but I am fine with anything.

I now have questions regarding the dbt + Airflow part. I am very new to SQL, so I really need your help to understand the logic behind the SQL queries you wrote.

  1. In airflow/dags/sql/auth_events.sql, you put a lot of COALESCE. I assume this is to handle the null values. But how about the 9999999999999 for registration? What does it mean?

  2. I think I understand what is going on in the SQL query below. But the SELECT part at the bottom looks kinda random to me. Can you explain the meaning of the string with multiple Ns, two '0's, and two 'NA's?

{{ config(materialized = 'table') }}

SELECT {{ dbt_utils.surrogate_key(['artistId']) }} AS artistKey,
    *
FROM (
        SELECT 
            MAX(artist_id) AS artistId,
            MAX(artist_latitude) AS latitude,
            MAX(artist_longitude) AS longitude,
            MAX(artist_location) AS location,
            REPLACE(REPLACE(artist_name, '"', ''), '\\', '') AS name
        FROM {{ source('staging', 'songs') }}
        GROUP BY artist_name

        UNION ALL

        SELECT 'NNNNNNNNNNNNNNN',
            0,
            0,
            'NA',
            'NA'
    )

@ankurchavda
Copy link
Owner

ankurchavda commented May 7, 2022

Sure, feel free to reach out over slack. I assume you must be in the Datatalks slack workspace.

For 1 and 2, both are techniques to handle dimensions where the value is null or unknown. It's kinda hard to explain over chat, but the idea is that there should not be any null dimensions in your fact. A null artist represents nothing and also messes up with aggregates and visualizations. Instead, we add a record in the artists dimension to say that all the null artist ids will now be represented by NNNNNNNNNNNNNNN. Just adds a little clarity to your data and helps you track stuff. I hope this makes sense. You'll come across this when you read about dimensional modelling, it's a fairly common topic.

Here's a short explanation on the Kimball group blog about the same.

@ankurchavda ankurchavda reopened this May 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants