๐Ÿ“’ Today I Learn/๐Ÿ Python

Spark๋ฅผ ์ด์šฉํ•œ ๋น…๋ฐ์ดํ„ฐ ๋ถ„์„ (3)

ny:D 2024. 7. 19. 16:10

240718 Today I Learn

์ŠคํŒŒํฌ์˜ ๊ธฐ๋ณธ ๊ตฌ์กฐ

Lazy Compuation

  • Spark๋Š” ๋ฐ”๋กœ๋ฐ”๋กœ ์ฃผ์–ด์ง„ ๋ณ€ํ™˜Transformation์„ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๊ณ , ์—ฌ๋Ÿฌ ๋ณ€ํ™˜๋“ค์„ ์ฐจ๊ณก์ฐจ๊ณก ์Œ“์•„๋‘” ๋’ค, ํ•œ ๋ฒˆ์— ์ตœ์ ํ™”ํ•˜์—ฌ ์ˆ˜ํ–‰
  • ๋ฏธ๋ฃจ์–ด๋‘” ์ผ์ด ์Œ“์ธ ๊ฒƒ์„ Lineage๋ผ๊ณ  ํ•˜๋Š”๋ฐ, ์ „์ฒด ๋ณ€ํ™˜์„ ๋ณด๊ณ  ์ค‘๊ฐ„์— ํ•„์š”์—†๋Š” ์—ฐ์‚ฐ์€ ์ œ๊ฑฐํ•˜๊ธฐ ์œ„ํ•ด ๊ณ ์˜์ ์œผ๋กœ ์ด๋Ÿฌํ•œ ์ง€์—ฐ์„ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด๋‹ค.
  • ๋ณ€ํ™˜์ด ์Œ“์ธ Lineage๋ฅผ ์ˆ˜ํ–‰ํ•˜๋„๋ก ๊ตฌ๋™Triggerํ•˜๋Š” ์ž‘์—…๋“ค์„ Action์ด๋ผ๊ณ  ํ•จ.
    → count, collect, take, top, show, write, toDF ๋“ฑ

๋ถ„์‚ฐ์ฒ˜๋ฆฌ์™€ Partition

  • Partition : ๋ชจ๋“  executer๊ฐ€ ๋ณ‘๋ ฌ๋กœ ์ผํ•  ์ˆ˜ ์žˆ๋„๋ก ์ชผ๊ฐ  ๋ฐ์ดํ„ฐ์˜ ๋‹จ์œ„
    • ํŒŒํ‹ฐ์…˜์„ ์ง์ ‘ ์กฐ์ž‘ํ•ด ์›ํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ํŒŒํ‹ฐ์…˜์— ๊ฐ•์ œ๋กœ ๋ฐฐ์ •ํ•  ์ˆ˜ ์—†์Œ 
      → high-level ์„ค๊ณ„๋งŒ ๊ฐ€๋Šฅ, ์‹ค์ œ ์‹œํ–‰์€ spark๊ฐ€ ์•Œ์•„์„œ ํ•จ
    • Repartition ์„ ํ†ตํ•ด ์žฌ๊ตฌ์„ฑ ๊ฐ€๋Šฅ
      → ํŠน์ • ํŒŒํ‹ฐ์…˜์˜ ๋ฉ”๋ชจ๋ฆฌ ๋ถ€ํ•˜๊ฐ€ ์‹ฌํ•˜๋ฉด OOM์œผ๋กœ ์ŠคํŒŒํฌ๊ฐ€ ์ข…๋ฃŒ๋  ์ˆ˜ ์žˆ์Œ.
    • ํŒŒํ‹ฐ์…˜ ์‚ฌ์ด์˜ ๋ฐ์ดํ„ฐ ํ†ต์‹ (=๋„คํŠธ์›Œํฌ ํ†ต์‹ )์€ ๋น„์‹ผ ์—ฐ์‚ฐ
      → ๊ฐ€๋Šฅํ•˜๋ฉด ํŒŒํ‹ฐ์…˜ ๋‚ด๋ถ€์—์„œ ์—ฐ์‚ฐ์ด ๊ฐ€๋Šฅํ•˜๋„๋ก ํ•˜๊ธฐ

Spark ์ž‘์—…์˜ ์ˆ˜ํ–‰๊ณผ์ •

  1. DataFrame ๋“ฑ์œผ๋กœ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑ
  2. ์ด๋ฅผ ๋…ผ๋ฆฌ์  ์‹คํ–‰ ๊ณ„ํš(Catalyst Optimizer)์œผ๋กœ ๋ณ€ํ™˜
    → Driver์™€ Execute์˜ ์ •๋ณด๋ฅผ ๊ณ ๋ คํ•˜์ง€ ์•Š๊ณ , ๋…ผ๋ฆฌ์ ์ธ ๊ฐ€๋Šฅ์„ฑ๋งŒ ์ฐธ์กฐ
  3. ์ŠคํŒŒํฌ๋Š” ๋…ผ๋ฆฌ์  ์‹คํ–‰ ๊ณ„ํš์„ ๋ฌผ๋ฆฌ์  ์‹คํ–‰ ๊ณ„ํš์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋ฉฐ, ๊ทธ ๊ณผ์ •์—์„œ ์ถ”๊ฐ€์  ์ตœ์ ํ™” ๊ฐ€๋Šฅ ์—ฌ๋ถ€๋ฅผ ํ™•์ธ
    → ํ…Œ์ด๋ธ”์˜ ํฌ๊ธฐ์™€ ํŒŒํ‹ฐ์…˜ ์ˆ˜ ๋“ฑ์„ ๊ณ ๋ ค
  4. ํด๋Ÿฌ์Šคํ„ฐ์—์„œ ๋ฌผ๋ฆฌ์  ์‹คํ–‰ ๊ณ„ํš(RDD ์ฒ˜๋ฆฌ)๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค


์ŠคํŒŒํฌ ๊ตฌ๋™ํ•˜๊ธฐ

์ŠคํŒŒํฌ ์„ธ์…˜

์ŠคํŒŒํฌ ์„ธ์…˜์€ Spark Applications์— ๋ช…๋ น์–ด์™€ ๋ฐ์ดํ„ฐ๋ฅผ ์ „๋‹ฌ.
→ PySpark์—์„œ๋„ Sparksession์„ ๋„์šฐ๋ฉด์„œ Spark๋ฅผ ์‹œ์ž‘ํ•จ

## Session ์ƒ์„ฑ
spark = (
    SparkSession
    .builder
    .appName("First Session")
    .getOrCreate()
)

spark

 

์ŠคํŒŒํฌ ๊ฟ€ํŒ

๋ธŒ๋กœ๋“œ ์บ์ŠคํŠธ

๐Ÿ’ก ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ
  • ๋ชจ๋“  Worker์—๊ฒŒ ํฐ ๊ทœ๋ชจ์˜ ์ž…๋ ฅ ๋ฐ์ดํ„ฐ์…‹์„ ํšจ์œจ์ ์œผ๋กœ ์ œ๊ณตํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•
  • ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ํšจ์œจ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•๋ก  ์ค‘ ํ•˜๋‚˜
  • Numpy ๋ธŒ๋กœ๋“œ ์บ์ŠคํŠธ

  • Spark ๋ธŒ๋กœ๋“œ ์บ์ŠคํŠธ
    • ๋ชจ๋“  Executor๊ฐ€ ์ ‘๊ทผ ๊ฐ€๋Šฅํ•œ ๊ณตํ†ต์˜ ๋ณ€์ˆ˜
    • ๋„ˆ๋ฌด ํฐ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ์˜คํžˆ๋ ค ๋ถ€ํ•˜๊ฐ€ ์‹ฌํ•ด์งˆ ์ˆ˜ ์žˆ์Œ.
    • spark.sparkContext.broadcast()๋ฅผ ์ด์šฉํ•˜๊ฒŒ ๋จ.
    • ๋ชจ๋“  Executor๊ฐ€ ๊ณต์œ ํ•˜๋˜ ๊ทธ ๊ฐ’์ด ๋ณ€ํ•  ์ˆ˜ ์ž†๋Š” ๊ฒƒ์„ ์–ดํ๋ฌผ๋ ˆ์ดํ„ฐ๋ผ ํ•ฉ๋‹ˆ๋‹ค

Join

big to big / big to small join

  • Big to Big join(= ์…”ํ”Œ ์กฐ์ธ) : ํฐ ํ…Œ์ด๋ธ” ๋ผ๋ฆฌ์˜ ์กฐ์ธ
    • ๋ชจ๋“  executer๊ฐ€ ๋‹ค๋ฅธ executer์™€ ํ†ต์‹ 
    • ๋ฐ์ดํ„ฐ๊ฐ€ ์–ด๋–ป๊ฒŒ ๋ถ„ํ• ๋˜์–ด์žˆ๋А๋ƒ์— ๋”ฐ๋ผ ํ˜ผ์žก๋„๊ฐ€ ๋‹ฌ๋ผ์ง
    • ๊ฐ€๋Šฅํ•˜๋ฉด ํ”ผํ•˜๋Š”๊ฒŒ ์ข‹์Œ → ์–ด๋–ป๊ฒŒํ•˜๋ฉด ํšจ์œจ์ ์œผ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์„๊นŒ๋ฅผ ๊ณ ๋ฏผ
  • Big to Small join : ํ•˜๋‚˜์˜ ํฐ ํ…Œ์ด๋ธ”๊ณผ ์ž‘์€ ํ…Œ์ด๋ธ”์„ ๋ณ‘ํ•ฉ
    • ์ž‘์€ ํ…Œ์ด๋ธ”์„ ๋ณต์‚ฌํ•ด executer์— ๋ณด๋ƒ„.
    • ์ฒ˜์Œ์—๋งŒ ๋Œ€๊ทœ๋ชจ ํ†ต์‹ ์ด ํ•„์š”ํ•˜๊ณ , ์ดํ›„์—๋Š” ๋…ธ๋“œ๊ฐ„ ํ†ต์‹ ์ด ๋ถˆํ•„์š”
  • Small to Small join
  • ๊ธฐํƒ€ join ํŒ
    • ๋ฐ์ดํ„ฐ๊ฐ€ ์ž˜ ๋ถ„ํ• ๋˜์–ด์žˆ๋‹ค๋ฉด big to big ์กฐ์ธ๋„ ๋น ๋ฅด๊ฒŒ ์ˆ˜ํ–‰ ๊ฐ€๋Šฅ (ํ•˜์ง€๋งŒ repartition ํ•  ์ •๋„๋Š” ์•„๋‹˜)
    • ์–ด๋–ค ์กฐ์ธ์€ ํ•„ํ„ฐ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— join ์ˆœ์„œ์— ๋”ฐ๋ผ ์†๋„๊ฐ€ ๋‹ฌ๋ผ์งˆ ์ˆ˜ ์žˆ์Œ.

๊ณ ๊ธ‰ I/O

  • inferschem : ์ž๋™์œผ๋กœ ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ์ถ”๋ก ํ•˜์ง€๋งŒ ๋‹ค์†Œ ๋А๋ฆฌ๊ณ  ์˜๋„ํ•˜์ง€ ์•Š์€ ํƒ€์ž…์„ ํ• ๋‹นํ•  ์ˆ˜๋„ ์žˆ์Œ
    → ์Šคํ‚ค๋งˆ๋ฅผ ๊ฐ•์ œํ•˜๋Š” ๊ฒƒ์ด ๋‚˜์Œ
  • Write option
    • mode: overwrite๋ผ๊ณ  ๋ช…์‹œํ•˜์ง€ ์•Š์œผ๋ฉด, ๊ธฐ์กด์˜ ํŒŒ์ผ์ด ์žˆ์„ ๋•Œ ์—๋Ÿฌ๊ฐ€ ๋‚จ.
    • partitionBy: ์–ด๋–ค ์นผ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„ํ• ํ•ด ์ €์žฅํ• ์ง€ ๋ช…์‹œ
    • maxPartitionBytes: ๋ถ„ํ• ๋œ ํŒŒ์ผ์˜ ์ตœ๋Œ€ ํฌ๊ธฐ๋ฅผ ์กฐ์ •
  • ETC
    • ํŒŒ์ผ ํฌ๊ธฐ ๊ด€๋ฆฌ ์ค‘์š” (์ผ๋ฐ˜์ ์œผ๋กœ 500MB ~ 2GB์„ ํƒ)
    • ์—ฌ๋Ÿฌ executer๊ฐ€ ๊ฐ™์€ ํŒŒ์ผ์„ ๋™์‹œ์— ์ฝ์„ ์ˆ˜๋Š” ์—†์ง€๋งŒ, ์—ฌ๋Ÿฌ ํŒŒ์ผ์„ ๋™์‹œ์— ์ฝ์„ ์ˆ˜๋Š” ์žˆ์Œ
    • ๋ณตํ•ฉ ๋ฐ์ดํ„ฐ์œ ํ˜•(nested)์€ ๋ชจ๋“  ํŒŒ์ผ ํฌ๋งท์—์„œ ์ ํ•ฉํ•˜์ง€๋Š” ์•Š์œผ๋ฏ€๋กœ ๊ถŒ์žฅ๋˜์ง€ ์•Š์Œ.