说明
Java从一开始就以手写RPC (01)为web套接字实现基础
Java从0开始用手写RPC (02)-netty4来实现客户端和服务器端
客户端和服务器端写完之后,如何实现客户端和服务器端的调用?
现在一起看一下。
接口定义
计算方法
package com.gi;
import com.gi;
import com.gi;
/**
* <p> 计算服务接口 </p>
*
* <pre> Created: 2018/8/24 下午4:47 </pre>
* <pre> Project: fake </pre>
*
* @author houbinbin
* @since 0.0.1
*/
public interface Calculator {
/**
* 计算加法
* @param request 请求入参
* @return 返回结果
*/
CalculateResponse sum(final CalculateRequest request);
}
pojo
对应的参数对象:
•CalculateRequest
package com.gi;
import java.io.Serializable;
/**
* <p> 请求入参 </p>
*
* <pre> Created: 2018/8/24 下午5:05 </pre>
* <pre> Project: fake </pre>
*
* @author houbinbin
* @since 0.0.3
*/
public class CalculateRequest implements Serializable {
private static final long serialVersionUID = 6420751004355300996L;
/**
* 参数一
*/
private int one;
/**
* 参数二
*/
private int two;
public CalculateRequest() {
}
public CalculateRequest(int one, int two) {
= one;
= two;
}
//getter setter toString
}
•CalculateResponse
package com.gi;
import java.io.Serializable;
/**
* <p> 请求入参 </p>
*
* <pre> Created: 2018/8/24 下午5:05 </pre>
* <pre> Project: fake </pre>
*
* @author houbinbin
* @since 0.0.3
*/
public class CalculateResponse implements Serializable {
private static final long serialVersionUID = -19720341L;
/**
* 是否成功
*/
private boolean success;
/**
* 二者的和
*/
private int sum;
public CalculateResponse() {
}
public CalculateResponse(Boolean success, int sum) {
= success;
= sum;
}
//getter setter toString
}
客户端
核心部分
RpcClient 需要添加对应的 Handler,调整如下:
Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = boo(workerGroup)
.channel)
.option, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler))
.addLast(new CalculateRequestEncoder())
.addLast(new CalculateResponseDecoder())
.addLast(new RpcClientHandler());
}
})
.connec, port)
.syncUninterruptibly();
netty 中的 handler 泳道设计的非常优雅,让我们的代码可以非常灵活地进行拓展。
接下来我们看一下对应的实现。
RpcClientHandler
package com.gi;
import com.gi;
import com.giFactory;
import com.gi;
import com.gi;
import com.gi;
import io.ne;
import io.ne;
/**
* <p> 客户端处理类 </p>
*
* <pre> Created: 2019/10/16 11:30 下午 </pre>
* <pre> Project: rpc </pre>
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClientHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFac);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
CalculateRequest request = new CalculateRequest(1, 2);
c(request);
log.info("[Client] request is :{}", request);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
CalculateResponse response = (CalculateResponse)msg;
log.info("[Client] response is :{}", response);
}
}
这里比较简单,channelActive 中我们直接发起调用,入参的对象为了简单,此处固定写死。
channelRead0 中监听服务端的相应结果,并做日志输出。
CalculateRequestEncoder
请求参数是一个对象,netty 是无法直接传输的,我们将其转换为基本对象:
package com.gi;
import com.gi;
import io.ne;
import io.ne;
import io.ne;
/**
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {
int one = m();
int two = m();
out.writeInt(one);
out.writeInt(two);
}
}
CalculateResponseDecoder
针对服务端的响应,也是同理。
我们需要把基本的类型,封装转换为我们需要的对象。
package com.gi;
import com.gi;
import io.ne;
import io.ne;
import io.ne;
import java.u;
/**
* 响应参数解码
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateResponseDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
boolean success = in.readBoolean();
int sum = in.readInt();
CalculateResponse response = new CalculateResponse(success, sum);
out.add(response);
}
}
服务端
设置处理类
RpcServer 中的处理类要稍微调整一下,其他的保持不变。
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBoo(workerGroup, bossGroup)
.channel)
// 打印日志
.handler(new LoggingHandler))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new CalculateRequestDecoder())
.addLast(new CalculateResponseEncoder())
.addLast(new RpcServerHandler());
}
})
// 这个参数影响的是还没有被accept 取出的连接
.option, 128)
// 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。
.childOption, true);
RpcServerHandler
一开始这里是空实现,我们来添加一下对应的实现。
package com.gi;
import com.gi;
import com.giFactory;
import com.gi;
import com.gi;
import com.gi.Calculator;
import com.gi;
import io.ne;
import io.ne;
/**
* @author binbin.hou
* @since 0.0.1
*/
public class RpcServerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFac);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String id = c().id().asLongText();
log.info("[Server] channel {} connected " + id);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final String id = c().id().asLongText();
CalculateRequest request = (CalculateRequest)msg;
log.info("[Server] receive channel {} request: {} from ", id, request);
Calculator calculator = new CalculatorService();
CalculateResponse response = calcula(request);
// 回写到 client 端
c(response);
log.info("[Server] channel {} response {}", id, response);
}
}
读取到客户端的访问之后,我们获取到计算的入参 CalculateRequest,然后调用 sum 方法,获取到对应的 CalculateResponse,将结果通知客户端。
CalculateRequestDecoder
这里和客户端是一一对应的,我们首先把 netty 传递的基本类型转换为 CalculateRequest 对象。
package com.gi;
import com.gi;
import io.ne;
import io.ne;
import io.ne;
import java.u;
/**
* 请求参数解码
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateRequestDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int one = in.readInt();
int two = in.readInt();
CalculateRequest request = new CalculateRequest(one, two);
out.add(request);
}
}
CalculateResponseEncoder
这里和客户端类似,我们需要把 response 转换为基本类型进行网络传输。
package com.gi;
import com.gi;
import io.ne;
import io.ne;
import io.ne;
/**
* @author binbin.hou
* @since 0.0.3
*/
public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {
boolean success = m();
int result = m();
out.writeBoolean(success);
out.writeInt(result);
}
}
CalculatorService
服务端对应的实现类。
public class CalculatorService implements Calculator {
@Override
public CalculateResponse sum(CalculateRequest request) {
int sum = reque()+reque();
return new CalculateResponse(true, sum);
}
}
测试
服务端
启动服务端:
new RpcServer().start();
服务端启动日志:
[DEBUG] [2021-10-05 11:53:11.795] [main] [c.g.h.l.i.c.LogFac] - Logging initialized using 'class com.gi; adapter.
[INFO] [2021-10-05 11:53:11.807] [Thread-0] [c.g.h.r.s.c.R] - RPC 服务开始启动服务端
十月 05, 2021 11:53:13 上午 io.ne channelRegistered
信息: [id: 0xd399474f] REGISTERED
十月 05, 2021 11:53:13 上午 io.ne bind
信息: [id: 0xd399474f] BIND: 0.0.0.0
十月 05, 2021 11:53:13 上午 io.ne channelActive
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE
[INFO] [2021-10-05 11:53:13.101] [Thread-0] [c.g.h.r.s.c.R] - RPC 服务端启动完成,监听【9527】端口
客户端
启动客户端:
new RpcClient().start();
日志如下:
[DEBUG] [2021-10-05 11:54:12.158] [main] [c.g.h.l.i.c.LogFac] - Logging initialized using 'class com.gi; adapter.
[INFO] [2021-10-05 11:54:12.164] [Thread-0] [c.g.h.r.c.c.R] - RPC 服务开始启动客户端
十月 05, 2021 11:54:13 上午 io.ne channelRegistered
信息: [id: 0x4d75c580] REGISTERED
十月 05, 2021 11:54:13 上午 io.ne connect
信息: [id: 0x4d75c580] CONNECT:
十月 05, 2021 11:54:13 上午 io.ne channelActive
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:] ACTIVE
[INFO] [2021-10-05 11:54:13.403] [Thread-0] [c.g.h.r.c.c.R] - RPC 服务启动客户端完成,监听端口:9527
十月 05, 2021 11:54:13 上午 io.ne write
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:] WRITE: 8B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 01 00 00 00 02 |........ |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 11:54:13 上午 io.ne flush
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:] FLUSH
[INFO] [2021-10-05 11:54:13.450] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.R] - [Client] request is :CalculateRequest{one=1, two=2}
十月 05, 2021 11:54:13 上午 io.ne channelRead
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:] READ: 5B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 00 03 |..... |
+--------+-------------------------------------------------+----------------+
十月 05, 2021 11:54:13 上午 io.ne channelReadComplete
信息: [id: 0x4d75c580, L:/127.0.0.1:54030 - R:] READ COMPLETE
[INFO] [2021-10-05 11:54:13.508] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.R] - [Client] response is :CalculateResponse{success=true, sum=3}
可以看到,输出了对应的请求参数和响应结果。
当然,此时服务端也有对应的新增日志:
十月 05, 2021 11:54:13 上午 io.ne channelRead
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9f5927, L: - R:/127.0.0.1:54030]
十月 05, 2021 11:54:13 上午 io.ne channelReadComplete
信息: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ COMPLETE
[INFO] [2021-10-05 11:54:13.432] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.R] - [Server] channel {} connected 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927
[INFO] [2021-10-05 11:54:13.495] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.R] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, two=2} from
[INFO] [2021-10-05 11:54:13.505] [nioEventLoopGroup-2-1] [c.g.h.r.s.h.R] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}
小结
为了便于大家学习,以上源码已经开源:
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次相遇。
1.文章《客户端是什么、微信客户端是什么!》援引自互联网,为网友投稿收集整理,仅供学习和研究使用,内容仅代表作者本人观点,与本网站无关,侵删请点击页脚联系方式。
2.文章《客户端是什么、微信客户端是什么!》仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证。
相关推荐
- . 现代买票为什么带上携程保险
- . 潮阳怎么去广州南站
- . 湖南马拉河怎么样
- . 烧纸为什么到三岔路口
- . 百色为什么这么热
- . 神州租车怎么样
- . 芜湖方特哪个适合儿童
- . 护肤品保养液是什么类目
- . 早晚的护肤保养有哪些项目
- . 女孩护肤品怎么保养的最好