maven 依赖

 <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.4</spark.version>
        <spark.core.version>2.4.6</spark.core.version>
        <scala.version>2.11</scala.version>
        <scala.sdk.version>2.11.12</scala.sdk.version>
        <scope.version>provided</scope.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.neo4j/neo4j-connector-apache-spark -->
        <dependency>
            <groupId>org.neo4j</groupId>
            <artifactId>neo4j-connector-apache-spark_2.11</artifactId>
            <version>4.1.3_for_spark_2.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.core.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.core.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.sdk.version}</version>
            <scope>${scope.version}</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.sdk.version}</version>
            <scope>${scope.version}</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.sdk.version}</version>
            <scope>${scope.version}</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <finalName>kafka-json-to-oracle</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.14.3</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                </configuration>
            </plugin>
        </plugins>
    </build>

写实体

import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkNeo4jWrite2 {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().config("spark.master", "local[*]").getOrCreate()
    import spark.implicits._

    val dfInstrument = Seq(
      ("Piccolo", "短笛"),
      ("Flute", "长笛"),
      ("Soprano Recorder", "高音竖笛"),
      ("Oboe", "双簧管")
    ).toDF("name", "nameZh")

    dfInstrument.write
      .format("org.neo4j.spark.DataSource")
      .mode(SaveMode.Overwrite)
      .option("url", "bolt://localhost:7687/neo4j")
      .option("labels", "Instrument2")
      .option("node.keys", "name,nameZh:chineseName")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "123456")
      .save()

  }

}

写关系

import org.apache.spark.sql.SparkSession

object SparkNeo4jRelationshipWrite2 {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().config("spark.master", "local[*]").getOrCreate()
    import spark.implicits._

    val musicDf = Seq(
      (12, "John Bonham", "Drums"),
      (19, "John Mayer", "Guitar"),
      (32, "John Scofield", "Guitar"),
      (33, "John Scofield", "Guitar"),
      (15, "John Butler", "Guitar")
    ).toDF("experience", "name", "instrument")

    // 键是数据框架列名,值是节点属性名。
    // relationship.source.node.keys
    // relationship.target.node.keys 如:instrument:name instrument 为 spark dataframe 列名; name 为 neo4j Instrument 属性

    musicDf.write
      .format("org.neo4j.spark.DataSource")
      .option("url", "bolt://localhost:7687")
      .option("relationship", "PLAYS")
      .option("relationship.save.strategy", "keys")
      .option("relationship.source.labels", ":Musician")
      .option("relationship.source.save.mode", "overwrite")
      .option("relationship.source.node.keys", "name:name,experience:experience")
      .option("relationship.target.labels", ":Instrument")
      .option("relationship.target.node.keys", "instrument:name")
      .option("relationship.target.save.mode", "overwrite")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "123456")
      .save()

  }

}

写关系前删除

未找到关系重复写入时的去重方法

import org.apache.spark.sql.functions.{col, substring_index}
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkNeo4jRelationshipWrite {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().config("spark.master", "local[*]").getOrCreate()
    import spark.implicits._

    val musicDf:DataFrame = Seq(
      (12, "Person/John Bonham", "John Mayer"),
      (19, "John Mayer", "John Scofield"),
      (32, "John Scofield", "John Butler"),
      (32, "Chandler", "Jim"),
      (15, "John Butler", "John Bonham")
    ).toDF("experience", "name", "friend")

    // 键是数据框架列名,值是节点属性名。
    // relationship.source.node.keys
    // relationship.target.node.keys 如:instrument:name instrument 为 spark dataframe 列名; name 为 neo4j Instrument 属性
    // withColumn("name", regexp_replace(col("name"), ",", ""))
    // .withColumn("name", substring(col("name"), "/", functions.length(col("name))"))

    ///val fieldList: List[String] = musicDf.schema.map(f => f.name).toList

    // val fieldString = fieldList.mkString(",")
    // 自定义字段转换
    val musicDf1:DataFrame = musicDf
      .withColumn("name2", substring_index(col("name"), "/", -1))

    musicDf1.printSchema()

    musicDf1.show()

    val empty:DataFrame = Seq((1)
    ).toDF("id")

    musicDf1.write.format("org.neo4j.spark.DataSource")
      .option("url", "bolt://localhost:7687")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "123456")
      .option("query", "MATCH (p:People) -[r:IS_FRIENDS_WITH]-(p1:People) where r.name2 = event.name2 DELETE r")
      .save()

    musicDf1.write
      .format("org.neo4j.spark.DataSource")
      .option("url", "bolt://localhost:7687")
      .option("relationship", "IS_FRIENDS_WITH")
      // .option("relationship.properties.keys", "name")
      .option("relationship.properties", "experience,name2")
      .option("relationship.save.strategy", "keys")
      .option("relationship.source.labels", ":People")
      .option("relationship.source.save.mode", "Overwrite")
      .option("relationship.source.node.keys", "name:name")
      .option("relationship.target.labels", ":People")
      .option("relationship.target.node.keys", "friend:name")
      .option("relationship.target.save.mode", "Overwrite")
      .option("authentication.basic.username", "neo4j")
      .option("authentication.basic.password", "123456")
      // .option("schema.optimization.type", "NODE_CONSTRAINTS")
      .save()


  }

}

读数据

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkNeo4jMain {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().config("spark.master", "local[*]").appName("spark-neo4j-test")
                // .config("spark.sql.shuffle.partitions", 100L)
                .getOrCreate();
        // final JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());

        Dataset<Row> ds = spark.read().format("org.neo4j.spark.DataSource")
                .option("url", "bolt://localhost:7687/neo4j")
                // .option("url", "neo4j://localhost:7687")
                .option("authentication.basic.username", "neo4j")
                .option("authentication.basic.password", "123456")
                .option("database", "system")
                .option("labels", "People")
                .load();

        try {
            ds.show();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}


参考:

https://neo4j.com/docs/spark/4.0/writing/

上一篇:https://snailgary.org/9c2d6b5842fa90afb1960b2cf302d84e