From be1d98b9a06ebd7f1f576507bce511f79be0b80a Mon Sep 17 00:00:00 2001
From: William Benton <willb@redhat.com>
Date: Thu, 27 Feb 2014 16:43:44 -0600
Subject: [PATCH 5/9] Removed code depending on Kryo
Conflicts:
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
---
.../apache/spark/serializer/KryoSerializer.scala | 175 ---------------------
.../apache/spark/storage/StoragePerfTester.scala | 103 ------------
.../org/apache/spark/storage/ThreadingTest.scala | 115 --------------
.../util/collection/ExternalAppendOnlyMap.scala | 1 +
.../apache/spark/graphx/GraphKryoRegistrator.scala | 48 ------
.../apache/spark/mllib/recommendation/ALS.scala | 12 --
.../spark/streaming/util/RawTextSender.scala | 82 ----------
7 files changed, 1 insertion(+), 535 deletions(-)
delete mode 100644 core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
delete mode 100644 core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
delete mode 100644 core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
delete mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
deleted file mode 100644
index c14cd47..0000000
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.serializer
-
-import java.nio.ByteBuffer
-import java.io.{EOFException, InputStream, OutputStream}
-
-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import com.esotericsoftware.kryo.{KryoException, Kryo}
-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
-
-import org.apache.spark._
-import org.apache.spark.broadcast.HttpBroadcast
-import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.storage._
-import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
-
-/**
- * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
- */
-class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging {
- private val bufferSize = {
- conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
- }
-
- def newKryoOutput() = new KryoOutput(bufferSize)
-
- def newKryo(): Kryo = {
- val instantiator = new EmptyScalaKryoInstantiator
- val kryo = instantiator.newKryo()
- val classLoader = Thread.currentThread.getContextClassLoader
-
- // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
- // Do this before we invoke the user registrator so the user registrator can override this.
- kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true))
-
- for (cls <- KryoSerializer.toRegister) kryo.register(cls)
-
- // Allow sending SerializableWritable
- kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
- kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
-
- // Allow the user to register their own classes by setting spark.kryo.registrator
- try {
- for (regCls <- conf.getOption("spark.kryo.registrator")) {
- logDebug("Running user registrator: " + regCls)
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
- reg.registerClasses(kryo)
- }
- } catch {
- case e: Exception => logError("Failed to run spark.kryo.registrator", e)
- }
-
- // Register Chill's classes; we do this after our ranges and the user's own classes to let
- // our code override the generic serialziers in Chill for things like Seq
- new AllScalaRegistrar().apply(kryo)
-
- kryo.setClassLoader(classLoader)
- kryo
- }
-
- def newInstance(): SerializerInstance = {
- new KryoSerializerInstance(this)
- }
-}
-
-private[spark]
-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
- val output = new KryoOutput(outStream)
-
- def writeObject[T](t: T): SerializationStream = {
- kryo.writeClassAndObject(output, t)
- this
- }
-
- def flush() { output.flush() }
- def close() { output.close() }
-}
-
-private[spark]
-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
- val input = new KryoInput(inStream)
-
- def readObject[T](): T = {
- try {
- kryo.readClassAndObject(input).asInstanceOf[T]
- } catch {
- // DeserializationStream uses the EOF exception to indicate stopping condition.
- case _: KryoException => throw new EOFException
- }
- }
-
- def close() {
- // Kryo's Input automatically closes the input stream it is using.
- input.close()
- }
-}
-
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
- val kryo = ks.newKryo()
-
- // Make these lazy vals to avoid creating a buffer unless we use them
- lazy val output = ks.newKryoOutput()
- lazy val input = new KryoInput()
-
- def serialize[T](t: T): ByteBuffer = {
- output.clear()
- kryo.writeClassAndObject(output, t)
- ByteBuffer.wrap(output.toBytes)
- }
-
- def deserialize[T](bytes: ByteBuffer): T = {
- input.setBuffer(bytes.array)
- kryo.readClassAndObject(input).asInstanceOf[T]
- }
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- val oldClassLoader = kryo.getClassLoader
- kryo.setClassLoader(loader)
- input.setBuffer(bytes.array)
- val obj = kryo.readClassAndObject(input).asInstanceOf[T]
- kryo.setClassLoader(oldClassLoader)
- obj
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new KryoSerializationStream(kryo, s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new KryoDeserializationStream(kryo, s)
- }
-}
-
-/**
- * Interface implemented by clients to register their classes with Kryo when using Kryo
- * serialization.
- */
-trait KryoRegistrator {
- def registerClasses(kryo: Kryo)
-}
-
-private[serializer] object KryoSerializer {
- // Commonly used classes.
- private val toRegister: Seq[Class[_]] = Seq(
- ByteBuffer.allocate(1).getClass,
- classOf[StorageLevel],
- classOf[PutBlock],
- classOf[GotBlock],
- classOf[GetBlock],
- classOf[MapStatus],
- classOf[BlockManagerId],
- classOf[Array[Byte]],
- (1 to 10).getClass,
- (1 until 10).getClass,
- (1L to 10L).getClass,
- (1L until 10L).getClass
- )
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala b/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
deleted file mode 100644
index 40734aa..0000000
--- a/core/src/main/scala/org/apache/spark/storage/StoragePerfTester.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.{CountDownLatch, Executors}
-
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.SparkContext
-import org.apache.spark.util.Utils
-
-/**
- * Utility for micro-benchmarking shuffle write performance.
- *
- * Writes simulated shuffle output from several threads and records the observed throughput.
- */
-object StoragePerfTester {
- def main(args: Array[String]) = {
- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
-
- /** Number of map tasks. All tasks execute concurrently. */
- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
-
- /** Number of reduce splits for each map task. */
- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
-
- val recordLength = 1000 // ~1KB records
- val totalRecords = dataSizeMb * 1000
- val recordsPerMap = totalRecords / numMaps
-
- val writeData = "1" * recordLength
- val executor = Executors.newFixedThreadPool(numMaps)
-
- System.setProperty("spark.shuffle.compress", "false")
- System.setProperty("spark.shuffle.sync", "true")
-
- // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
- val sc = new SparkContext("local[4]", "Write Tester")
- val blockManager = sc.env.blockManager
-
- def writeOutputBytes(mapId: Int, total: AtomicLong) = {
- val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
- new KryoSerializer(sc.conf))
- val writers = shuffle.writers
- for (i <- 1 to recordsPerMap) {
- writers(i % numOutputSplits).write(writeData)
- }
- writers.map {w =>
- w.commit()
- total.addAndGet(w.fileSegment().length)
- w.close()
- }
-
- shuffle.releaseWriters(true)
- }
-
- val start = System.currentTimeMillis()
- val latch = new CountDownLatch(numMaps)
- val totalBytes = new AtomicLong()
- for (task <- 1 to numMaps) {
- executor.submit(new Runnable() {
- override def run() = {
- try {
- writeOutputBytes(task, totalBytes)
- latch.countDown()
- } catch {
- case e: Exception =>
- println("Exception in child thread: " + e + " " + e.getMessage)
- System.exit(1)
- }
- }
- })
- }
- latch.await()
- val end = System.currentTimeMillis()
- val time = (end - start) / 1000.0
- val bytesPerSecond = totalBytes.get() / time
- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
-
- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
-
- executor.shutdown()
- sc.stop()
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
deleted file mode 100644
index 729ba2c..0000000
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import akka.actor._
-
-import java.util.concurrent.ArrayBlockingQueue
-import util.Random
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.{SparkConf, SparkContext}
-
-/**
- * This class tests the BlockManager and MemoryStore for thread safety and
- * deadlocks. It spawns a number of producer and consumer threads. Producer
- * threads continuously pushes blocks into the BlockManager and consumer
- * threads continuously retrieves the blocks form the BlockManager and tests
- * whether the block is correct or not.
- */
-private[spark] object ThreadingTest {
-
- val numProducers = 5
- val numBlocksPerProducer = 20000
-
- private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
- val queue = new ArrayBlockingQueue[(BlockId, Seq[Int])](100)
-
- override def run() {
- for (i <- 1 to numBlocksPerProducer) {
- val blockId = TestBlockId("b-" + id + "-" + i)
- val blockSize = Random.nextInt(1000)
- val block = (1 to blockSize).map(_ => Random.nextInt())
- val level = randomLevel()
- val startTime = System.currentTimeMillis()
- manager.put(blockId, block.iterator, level, true)
- println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
- queue.add((blockId, block))
- }
- println("Producer thread " + id + " terminated")
- }
-
- def randomLevel(): StorageLevel = {
- math.abs(Random.nextInt()) % 4 match {
- case 0 => StorageLevel.MEMORY_ONLY
- case 1 => StorageLevel.MEMORY_ONLY_SER
- case 2 => StorageLevel.MEMORY_AND_DISK
- case 3 => StorageLevel.MEMORY_AND_DISK_SER
- }
- }
- }
-
- private[spark] class ConsumerThread(
- manager: BlockManager,
- queue: ArrayBlockingQueue[(BlockId, Seq[Int])]
- ) extends Thread {
- var numBlockConsumed = 0
-
- override def run() {
- println("Consumer thread started")
- while(numBlockConsumed < numBlocksPerProducer) {
- val (blockId, block) = queue.take()
- val startTime = System.currentTimeMillis()
- manager.get(blockId) match {
- case Some(retrievedBlock) =>
- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
- "Block " + blockId + " did not match")
- println("Got block " + blockId + " in " +
- (System.currentTimeMillis - startTime) + " ms")
- case None =>
- assert(false, "Block " + blockId + " could not be retrieved")
- }
- numBlockConsumed += 1
- }
- println("Consumer thread terminated")
- }
- }
-
- def main(args: Array[String]) {
- System.setProperty("spark.kryoserializer.buffer.mb", "1")
- val actorSystem = ActorSystem("test")
- val conf = new SparkConf()
- val serializer = new KryoSerializer(conf)
- val blockManagerMaster = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
- val blockManager = new BlockManager(
- "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
- val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
- val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
- producers.foreach(_.start)
- consumers.foreach(_.start)
- producers.foreach(_.join)
- consumers.foreach(_.join)
- blockManager.stop()
- blockManagerMaster.stop()
- actorSystem.shutdown()
- actorSystem.awaitTermination()
- println("Everything stopped.")
- println(
- "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 6f36817..c4f3efe 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BlockManager}
+
/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
* to grow.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
deleted file mode 100644
index dd380d8..0000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx
-
-import com.esotericsoftware.kryo.Kryo
-
-import org.apache.spark.graphx.impl._
-import org.apache.spark.serializer.KryoRegistrator
-import org.apache.spark.util.collection.BitSet
-import org.apache.spark.util.BoundedPriorityQueue
-
-/**
- * Registers GraphX classes with Kryo for improved performance.
- */
-class GraphKryoRegistrator extends KryoRegistrator {
-
- def registerClasses(kryo: Kryo) {
- kryo.register(classOf[Edge[Object]])
- kryo.register(classOf[MessageToPartition[Object]])
- kryo.register(classOf[VertexBroadcastMsg[Object]])
- kryo.register(classOf[(VertexId, Object)])
- kryo.register(classOf[EdgePartition[Object]])
- kryo.register(classOf[BitSet])
- kryo.register(classOf[VertexIdToIndexMap])
- kryo.register(classOf[VertexAttributeBlock[Object]])
- kryo.register(classOf[PartitionStrategy])
- kryo.register(classOf[BoundedPriorityQueue[Object]])
- kryo.register(classOf[EdgeDirection])
-
- // This avoids a large number of hash table lookups.
- kryo.setReferences(false)
- }
-}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 44db51c..f13781a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -26,10 +26,8 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
-import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.SparkContext._
-import com.esotericsoftware.kryo.Kryo
import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
@@ -641,12 +639,6 @@ object ALS {
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
}
- private class ALSRegistrator extends KryoRegistrator {
- override def registerClasses(kryo: Kryo) {
- kryo.register(classOf[Rating])
- }
- }
-
def main(args: Array[String]) {
if (args.length < 5 || args.length > 9) {
println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> " +
@@ -660,10 +652,6 @@ object ALS {
val alpha = if (args.length >= 8) args(7).toDouble else 1
val blocks = if (args.length == 9) args(8).toInt else -1
val conf = new SparkConf()
- .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", classOf[ALSRegistrator].getName)
- .set("spark.kryo.referenceTracking", "false")
- .set("spark.kryoserializer.buffer.mb", "8")
.set("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS", conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
deleted file mode 100644
index 684b38e..0000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.util
-
-import java.io.IOException
-import java.net.ServerSocket
-import java.nio.ByteBuffer
-
-import scala.io.Source
-
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import org.apache.spark.{SparkConf, Logging}
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.util.IntParam
-
-/**
- * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
- * specified rate. Used to feed data into RawInputDStream.
- */
-private[streaming]
-object RawTextSender extends Logging {
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: RawTextSender <port> <file> <blockSize> <bytesPerSec>")
- System.exit(1)
- }
- // Parse the arguments using a pattern match
- val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args
-
- // Repeat the input data multiple times to fill in a buffer
- val lines = Source.fromFile(file).getLines().toArray
- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
- val ser = new KryoSerializer(new SparkConf()).newInstance()
- val serStream = ser.serializeStream(bufferStream)
- var i = 0
- while (bufferStream.position < blockSize) {
- serStream.writeObject(lines(i))
- i = (i + 1) % lines.length
- }
- bufferStream.trim()
- val array = bufferStream.array
-
- val countBuf = ByteBuffer.wrap(new Array[Byte](4))
- countBuf.putInt(array.length)
- countBuf.flip()
-
- val serverSocket = new ServerSocket(port)
- logInfo("Listening on port " + port)
-
- while (true) {
- val socket = serverSocket.accept()
- logInfo("Got a new connection")
- val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec)
- try {
- while (true) {
- out.write(countBuf.array)
- out.write(array)
- }
- } catch {
- case e: IOException =>
- logError("Client disconnected")
- socket.close()
- }
- }
- }
-}
--
1.8.3.4 (Apple Git-47)