Skip to content
Snippets Groups Projects
Commit 7b85e086 authored by Manish A.Shetty's avatar Manish A.Shetty
Browse files

word splitter

parent 5e24fe79
No related branches found
No related tags found
No related merge requests found
.DS_Store
*.iml
.idea
target/
mvn compile exec:java -Dexec.mainClass=com.example.dataflow.StarterPipeline -Dexec.cleanupDaemonThreads=false -Dexec.args=" --project=tvs-telematics-dev --stagingLocation=gs://tvs-telematics-dev/staging/ --tempLocation=gs://tvs-telematics-dev/temp --runner=DataflowRunner"
# wordcount
PipelineStarter
===============
Send alarm data from kafka to CLOUDSQL
images/allow_all_api_access.png

46.8 KiB

images/create_key.png

100 KiB

images/maxlaneflow_elements.png

31.6 KiB

hello world
jaza!
\ No newline at end of file
images/step_autoscaled.png

19.8 KiB

images/step_elements.png

19.4 KiB

images/workers.png

32.9 KiB

me.txt 0 → 100644
hello world
jaza!
\ No newline at end of file
pom.xml 0 → 100644
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.dataflow</groupId>
<artifactId>my-dataflow-project</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>ossrh.snapshots</id>
<name>Sonatype OSS Repository Hosting</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.4.0</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.40.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>2.40.0</version>
<!-- <exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>-->
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.43.0</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20190722</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
</project>
package com.example.dataflow;
import org.apache.beam.sdk.transforms.DoFn;
class Splitter extends DoFn<String, String>
{
@ProcessElement
public void processElement(ProcessContext c)
{
// Split the line into words.
System.out.println("Entered processelement");
String[] words = c.element().split("[^a-zA-Z']+");
// Output each word encountered into the output PCollection.
for (String word : words)
{
if (!word.isEmpty())
{
System.out.println("Executing");
c.output(word);
}
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.dataflow;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class StarterPipelineSQL
{
private static final Logger logger = LoggerFactory.getLogger(StarterPipelineSQL.class);
private static Pipeline createPipeline()
{
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
// GoogleCredentials credentials= GoogleCredentials.fromStream(new FileInputStream("E:\\Authorization.json")).toBuilder().build();
// options.setGcpCredential(credentials);
options.setProject("maximal-chemist-355505");
options.setJobName("mydataflow2" + RandomStringUtils.randomNumeric(3));
options.setStagingLocation("gs://innovators-hive/staging/");
options.setGcpTempLocation("gs://innovators-hive/staging/");
options.setNetwork("kafka-telem");
options.setSubnetwork("https://www.googleapis.com/compute/v1/projects/maximal-chemist-355505/regions/asia-south1/subnetworks/training-kafka");
options.setMaxNumWorkers(5);
options.setWorkerMachineType("n1-standard-2");
options.setRegion("asia-south1");
options.setStreaming(false);
return (Pipeline.create(options));
}
public static void main(String[] args) throws IOException
{
Pipeline p = createPipeline();
p
.apply("ReadLines", TextIO.read().from("gs://havvatrain/me.txt"))
.apply("Word Splitter", ParDo.of(new Splitter()))
.apply("Write GCS", TextIO.write().to("gs://innovators-hive/havaa/"));
// .apply("iTriangleConverter", ParDo.of(new Splitter()))
// .apply("WriteCount", TextIO.write().to("gs://innovators-hive/havaa/"));
System.out.println("About to run now");
p.run();
}
}
hello world
jaza!
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment