How to create a Twitter custom receiver for Apache Spark streaming, an example

Apache Spark offers streaming capabilities with several predefined connectors (Kafka, sockets, file system etc). Additional receivers can be connected by extending the Receiver class. This example implements a Twitter custom receiver for Apache Spark streaming.

The complete example can be downloaded here https://github.com/melphi/spark-examples/tree/master/streaming-twitter-custom-receiver

Maven dependencies

The project includes the Twitter4j library and Spark streaming dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.sparkexamples</groupId>
    <artifactId>streaming-twitter-custom-receiver</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <!-- Twitter client -->
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>4.0.6</version>
        </dependency>

        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.22</version>
        </dependency>

        <!-- Testing -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

Spark Twitter custom receiver

The Twitter stream connector extends the Receiver class, in this example the custom twitter receiver will listen to status messages containing the word "twitter".

The source code can be found here. https://github.com/melphi/spark-examples/blob/master/streaming-twitter-custom-receiver/src/main/java/org/sparkexample/TwitterReceiver.java

public final class TwitterReceiver extends Receiver<Status> {
  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterReceiver.class);

  /**
   * The keywords to be tracked.
   */
  private static final String KEYWORDS = "twitter";

  private final TwitterStream twitterStream;

  private StatusListener listener;

  public TwitterReceiver(StorageLevel storageLevel) {
    super(storageLevel);
    checkArgument(StorageLevel.MEMORY_ONLY().equals(storageLevel),
        String.format("Only [%s] supported.", StorageLevel.MEMORY_ONLY().toString()));
    twitterStream = new TwitterStreamFactory().getInstance();
  }

  @Override
  public void onStart() {
    if (listener == null) {
      listener = new StreamListener();
    }
    twitterStream.addListener(listener);
    twitterStream.filter(createFilter());
  }

  private FilterQuery createFilter() {
    FilterQuery filterQuery = new FilterQuery();
    try {
      filterQuery.track(KEYWORDS);
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      throw new IllegalArgumentException(e);
    }
    return filterQuery;
  }

  @Override
  public void onStop() {
    twitterStream.clearListeners();
    twitterStream.cleanUp();
    listener = null;
  }

  private class StreamListener implements StatusListener {
    @Override
    public void onStatus(Status status) {
      store(status);
    }

    @Override
    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
      // Intentionally empty.
    }

    @Override
    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
      // Intentionally empty.
    }

    @Override
    public void onScrubGeo(long userId, long upToStatusId) {
      // Intentionally empty.
    }

    @Override
    public void onStallWarning(StallWarning warning) {
      // Intentionally empty.
    }

    @Override
    public void onException(Exception ex) {
      LOGGER.warn(ex.getMessage(), ex);
    }
  }
}

Twitter4j requires a twitter4j.properties file in the /main/java/resources folder with the twitter authentication parameters. See http://twitter4j.org/en/configuration.html for more information.

Spark streaming twitter task

The new Twitter custom receiver can be used in a Spark streaming task, here is an example. The  complete source code is here https://github.com/melphi/spark-examples/blob/master/streaming-twitter-custom-receiver/src/main/java/org/sparkexample/TwitterStreamTask.java

public class TwitterStreamTask {
  private static final Class[] KRYO_CLASSES = ImmutableList.builder()
      .add(GeoLocation.class)
      .add(Status.class)
      .add(User.class)
      .build()
      .toArray(new Class[] {});

  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamTask.class);

  public static void main(String args[]) throws InterruptedException {
    new TwitterStreamTask().run();
  }

  public void run() throws InterruptedException {
    SparkConf conf = new SparkConf().setMaster("local[*]")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .registerKryoClasses(KRYO_CLASSES)
        .setAppName("sparkTask");

    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(10));

    streamingContext.receiverStream(new TwitterReceiver(StorageLevel.MEMORY_ONLY()))
        .foreachRDD(
            rdd -> rdd.coalesce(10)
                .foreach(message -> LOGGER.info(message.getText())));

    streamingContext.start();
    streamingContext.awaitTermination();
  }
}

Runnin the example

The example can be executed by running mvn test on the project directory. Twitter4j offers many more functionalities, it is possible for example to listen to status messages from specific users, language, etc.