前言:
而今小伙伴们对“apache处理post请求”都比较关怀,各位老铁们都需要学习一些“apache处理post请求”的相关资讯。那么小编在网摘上汇集了一些对于“apache处理post请求””的相关资讯,希望大家能喜欢,兄弟们快快来学习一下吧!需求
在java中httpServer是非常普遍的一个功能,前端发出请求,后端根据url 以及参数做出相应的处理,返回数据,有get请求 post请求。
现在我需要flink实现这样的功能,http请求flink的source,sink进行处理并返回结果
最终实现结果截图如下实现代码如下Job代码
package Job;import flink.HttpSource;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Job { public static Logger logger = LoggerFactory.getLogger(Job.class); public static void main(String[] args) throws Exception { //1、获取流处理的环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource source = env.addSource(new HttpSource()); source.addSink(new Sink()); env.execute("http计算"); }}HttpSource代码
监听端口数据
package flink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.ServerSocket;import java.net.Socket;public class HttpSource extends RichParallelSourceFunction<String> { public static ServerSocket serverSocket = null; public static Socket socket = null; public static BufferedReader bufferedReader =null; public static Logger logger = LoggerFactory.getLogger(HttpSource.class); @Override public void open(Configuration parameters) throws Exception { int port = 8200; try { //1、开启端口 serverSocket = new ServerSocket(port); } catch (IOException e) { logger.error("open port error "+e.getMessage()); } } @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { try { logger.info("serverSocket accept "); socket = serverSocket.accept(); logger.info("serverSocket bufferedReader "); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//获取输入流(请求) StringBuilder stringBuilder = new StringBuilder(); String line = null; Thread.sleep(500); //得到请求的内容,注意这里作两个判断非空和""都要,只判断null会有问题 logger.info("serverSocket readLine "); while ((line = bufferedReader.readLine()) != null && !line.equals("")) { stringBuilder.append(line); } //过滤掉某些请求头之类的 String result = stringBuilder.toString().split(" ")[1]; ctx.collect(result); } catch (Exception e) { logger.error("HttpSource "+e.getMessage()); continue; }finally { if(bufferedReader!=null){ bufferedReader.close(); } } } } @Override public void cancel() { }}注意一个地方
正常情况下,前端进行请求,会返回请求方式 请求路径以及请求参数 后面会有浏览器整个信息,非常多,我这里做了处理,只获取请求路径以及请求参数
Sink类
package Job;import Server.httpRequest;import Server.httpResponse;import com.alibaba.fastjson.JSONObject;import flink.HttpSource;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Sink extends RichSinkFunction<String> { public static Logger logger=LoggerFactory.getLogger(Sink.class); public void open(Configuration parameters){ } @Override public void invoke(String result, Context context){ //1、处理数据,过滤干扰因素 /hello?name=dde JSONObject jsonObject = new httpRequest().analysisContent(result); //2、获取url if(jsonObject.containsKey("url")){ //3、根据不同的url做出不同的处理 if(jsonObject.getString("url").equals("/sub")){ if(jsonObject.containsKey("params")){ System.out.println("hello "+result); new httpResponse().sendsg(HttpSource.socket,jsonObject.getJSONObject("params")); } } if(jsonObject.getString("url").equals("/add")){ if(jsonObject.containsKey("params")){ System.out.println("fail "+result); new httpResponse().sendsg1(HttpSource.socket,jsonObject.getJSONObject("params")); } } } }}对不同的url进行不同的处理 httpResponse类
package Server;import com.alibaba.fastjson.JSONObject;import java.io.IOException;import java.io.PrintWriter;import java.net.Socket;public class httpResponse { public void sendsg(Socket socket, JSONObject msg){ PrintWriter printWriter = null;//这里第二个参数表示自动刷新缓存 try { printWriter = new PrintWriter( socket.getOutputStream(), true); } catch (IOException e) { System.out.println(""+e.getMessage()); } printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); printWriter.println(); float a=0; float b=0; float c=0; if(msg.containsKey("a")){ a= msg.getFloat("a"); } if(msg.containsKey("b")){ b= msg.getFloat("b"); } if(msg.containsKey("c")){ c=msg.getFloat("c"); } c=a*2+b-c; printWriter.write(c+"");//将日志输出到浏览器 printWriter.close(); } public void sendsg1(Socket socket, JSONObject msg){ PrintWriter printWriter = null;//这里第二个参数表示自动刷新缓存 try { printWriter = new PrintWriter( socket.getOutputStream(), true); } catch (IOException e) { System.out.println(""+e.getMessage()); } printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); printWriter.println(); float a=0; float b=0; float c=0; if(msg.containsKey("a")){ a= msg.getFloat("a"); } if(msg.containsKey("b")){ b= msg.getFloat("b"); } if(msg.containsKey("c")){ c=msg.getFloat("c"); } c=a+b+c; printWriter.write(c+"");//将日志输出到浏览器 printWriter.close(); }}对获取的参数进行过滤筛选
package Server;import com.alibaba.fastjson.JSONObject;public class httpRequest { public JSONObject analysisContent(String result){ JSONObject obj=new JSONObject(); //1、过滤只留下url 和请求参数 值 String [] contents=result.split("\\?"); if(contents.length>1){ obj.put("url",contents[0]); JSONObject keyObject=new JSONObject(); String [] params=contents[1].split("&"); for(int i=0;i<params.length;i++){ String [] map= params[i].split("="); keyObject.put(map[0],map[1]); } obj.put("params",keyObject); } return obj; }}打包成jar包在flink上运行测试运行15m以后是否有效
不同的url返回不同的处理结果,完美
感想
很早接触flink 非常的排斥,然后很多都是借用工具的,如果没有开发好的轮子,需要自己造的,就会非常排斥,比如这个功能,我就搞了很久,之前搞的版本是springboot+flink的方式,纯jar包,脱离了集群,我感觉那种方式跟搞着玩一样。
昨天看了一篇文章穷爸爸富爸爸,讲了穷人思维,突然觉得自己有必要思考,结果2h不到,就搞出来了,改变思维方式,不然会越忙越穷
标签: #apache处理post请求