diff --git a/.gitignore b/.gitignore index 9ed3b07..ada62b0 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.test +.idea diff --git a/avro_flupe.avrp b/avro_flupe.avrp new file mode 100644 index 0000000..ca855dc --- /dev/null +++ b/avro_flupe.avrp @@ -0,0 +1,56 @@ +{ + "protocol":"AvroSourceProtocol", + "namespace":"org.apache.flume.source.avro", + "doc":"* Licensed to the Apache Software Foundation (ASF).", + "types":[ + { + "type":"enum", + "name":"Status", + "symbols":[ + "OK", + "FAILED", + "UNKNOWN" + ] + }, + { + "type":"record", + "name":"AvroFlumeEvent", + "fields":[ + { + "name":"headers", + "type":{ + "type":"map", + "values":"string" + } + }, + { + "name":"body", + "type":"bytes" + } + ] + } + ], + "messages":{ + "append":{ + "request":[ + { + "name":"event", + "type":"AvroFlumeEvent" + } + ], + "response":"Status" + }, + "appendBatch":{ + "request":[ + { + "name":"events", + "type":{ + "type":"array", + "items":"AvroFlumeEvent" + } + } + ], + "response":"Status" + } + } +} diff --git a/encoder_test.go b/encoder_test.go new file mode 100644 index 0000000..32d37f0 --- /dev/null +++ b/encoder_test.go @@ -0,0 +1,41 @@ +package goavro + +import ( + "testing" + "bytes" +) + +const testSchema= ` +{ + "type": "record", + "name": "test", + "fields" : [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "string"} + ] +} +` +func TestEncode(t *testing.T) { + record,err := NewRecord(RecordSchema(testSchema)) + if err != nil { + t.Fatal(err) + } + record.Set("a",int64(27)) + record.Set("b","foo") + + codec, err := NewCodec(testSchema) + if err != nil { + t.Fatal(err) + } + + bb := new(bytes.Buffer) + if err = codec.Encode(bb, record); err !=nil { + t.Fatal(err) + } + actual := bb.Bytes() + expected := []byte("\x36\x06\x66\x6f\x6f") + + if bytes.Compare(actual, expected) != 0 { + t.Errorf("Actual: %#v; Expected: %#v", actual, expected) + } +} diff --git a/examples/flume/client.go b/examples/flume/client.go new file mode 100644 index 0000000..80684c4 --- /dev/null +++ b/examples/flume/client.go @@ -0,0 +1,50 @@ +package main +import ( + "github.com/sebglon/goavro" + "log" + "github.com/sebglon/goavro/transceiver/netty" + "time" +) + +func main() { + //t.SkipNow() + transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:false, NettyHost:"192.168.11.152"}) + if err != nil { + log.Fatal(err) + } + protocol, err := goavro.NewProtocol() + if err != nil { + log.Fatal(err) + } + + flumeRecord, errFlume := protocol.NewRecord("AvroFlumeEvent") + if errFlume != nil { + log.Fatal(errFlume) + } + headers := make(map[string]interface{}) + headers["host_header"] = "127.0.0.1" + flumeRecord.Set("headers", headers) + + requestor := goavro.NewRequestor(protocol, transceiver) + + flumeRecord.Set("body", []byte("test 1")) + err = requestor.Request("append", flumeRecord) + + if err != nil { + log.Fatal("Request 1: ", err) + } + + log.Printf("Test 1 OK") + + + time.Sleep(5 * time.Second) + flumeRecord.Set("body", []byte("test 2")) + err = requestor.Request("append", flumeRecord) + + if err != nil { + log.Fatal("Request 2: ", err) + } + log.Printf("Test 2 OK") + +} + diff --git a/protocol.go b/protocol.go new file mode 100644 index 0000000..df62a46 --- /dev/null +++ b/protocol.go @@ -0,0 +1,155 @@ +package goavro + +import ( + "crypto/md5" + "encoding/json" + "fmt" +) + +var TYPES_CACHE map[string]ProtocolType + +type Protocol struct { + Name string `json:"protocol"` + Namespace string `json:"namespace"` + Fullname string `json:"-"` + Doc string `json:"doc"` + Types []AbsType `json:"types"` + Messages map[string]ProtocolMessage `json:"messages"` + MD5 []byte `json:"-"` +} + +type ProtocolType struct { + TypeX string `json:"type"` + Name string `json:"name,omitempty"` + Symbols []string `json:"symbols,omitempty"` + Fields []Field `json:"fields,omitempty"` + Values string `json:"values,omitempty"` + Items string `json:"items,omitempty"` +} + +type Field struct{ + Name string `json:"name"` + TypeX AbsType `json:"type"` +} + +type AbsType struct { + *ProtocolType + ref string `json:"-"` +} + +type ProtocolMessage struct { + Name string `json:"-"` + Doc string `json:"doc,omitempty"` + Request []Field `json:"request"` + Response string `json:"response"` + Errors []string `json:"errors,omitempty"` + One_way bool `json:"one-way,omitempty"` +} + +const proto = ` +{"protocol":"AvroSourceProtocol","namespace":"org.apache.flume.source.avro","doc":"* Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing,\n * software distributed under the License is distributed on an\n * \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n * KIND, either express or implied. See the License for the\n * specific language governing permissions and limitations\n * under the License.","types":[{"type":"enum","name":"Status","symbols":["OK","FAILED","UNKNOWN"]},{"type":"record","name":"AvroFlumeEvent","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}],"messages":{"append":{"request":[{"name":"event","type":"AvroFlumeEvent"}],"response":"Status"},"appendBatch":{"request":[{"name":"events","type":{"type":"array","items":"AvroFlumeEvent"}}],"response":"Status"}}} +` + +func init() { + TYPES_CACHE = make(map[string]ProtocolType) + TYPES_CACHE["bytes"] = ProtocolType{Name:"bytes"} + TYPES_CACHE["enum"] = ProtocolType{Name:"enum"} + TYPES_CACHE["record"] = ProtocolType{Name:"record"} +} +func (t *AbsType) UnmarshalJSON(data []byte) error { + var nameType string + var protocolType ProtocolType + if err := json.Unmarshal(data, &nameType); err==nil { + protoType, ok := TYPES_CACHE[nameType] + if ok { + t.ref = nameType + protocolType = protoType + } else { + return fmt.Errorf("Type %s not found on protocol type cache %#v", data, TYPES_CACHE) + } + } else if err := json.Unmarshal(data, &protocolType); err!=nil { + return fmt.Errorf("Fail to Parse AbsType, %s %s", data,err ) + } + t.ProtocolType = &protocolType + TYPES_CACHE[protocolType.Name] = protocolType + return nil +} + +func (t *AbsType) MarshalJSON()([]byte, error) { + if len(t.ref)>0 { + return json.Marshal(t.ref) + } else { + return json.Marshal(t.ProtocolType) + } +} + +func NewProtocol() (Protocol, error) { + var result Protocol + err := json.Unmarshal([]byte(proto), &result) + + if err!=nil { + return result, err + } + + if len(result.Name)==0 { + err = fmt.Errorf("Protocol must have a non-empty name.") + } else if len(result.Namespace) == 0 { + err = fmt.Errorf("The namespace property must be a string.") + } + result.Fullname = result.Namespace +"." + result.Name + + bb, err := json.Marshal(result) + + if err!=nil { + return result, err + } + hash := md5.Sum(bb) + result.MD5 = hash[:] + return result, err +} + +func (p *Protocol) Json() (string, error) { + var result string + bb, err := json.Marshal(p) + if err != nil { + return result, err + + } + return string(bb), nil +} + + +func (p *Protocol) MessageResponseCodec(messageName string) (Codec, error) { + json, err := p.MessageResponseJson(messageName) + if err!= nil { + return nil, err + } + return NewCodec(json) +} +func (p *Protocol) MessageResponseJson(messageName string) (string, error) { + field := p.Messages[messageName].Response + avroType := TYPES_CACHE[field] + json, err := json.Marshal(avroType) + return string(json), err +} +func (p *Protocol) MessageRequestCodec(messageName string) (Codec, error) { + json, err := p.MessageRequestJson(messageName) + if err!= nil { + return nil, err + } + return NewCodec(json) +} +func (p *Protocol) MessageRequestJson(messageName string) (string, error) { + field := p.Messages[messageName].Request[0] + avroType := TYPES_CACHE[field.TypeX.ref] + json, err := json.Marshal(avroType) + return string(json), err +} +func (p *Protocol) NewRecord(typeName string) (*Record, error) { + avroType := TYPES_CACHE[typeName] + json, err := json.Marshal(avroType) + if err!= nil { + return nil, err + } + return NewRecord(RecordSchema(string(json))) +} diff --git a/protocol_test.go b/protocol_test.go new file mode 100644 index 0000000..13f6b8a --- /dev/null +++ b/protocol_test.go @@ -0,0 +1,113 @@ +package goavro + +import ( + "testing" + "bytes" + "encoding/json" + "reflect" +) + +func TestMD5(t *testing.T) { + proto, err := NewProtocol() + if err!=nil { + t.Fatal(err) + } + expected := []byte("\x86\xaa\xda\xe2\xc4\x54\x74\xc0\xfe\x93\xff\xd0\xf2\x35\x0a\x65") + if !reflect.DeepEqual(expected, proto.MD5) { + t.Errorf("MD5 not equals: %x / %x", expected, proto.MD5) + } + + +} + + +func TestProtoParse(t *testing.T) { + proto, err := NewProtocol() + if err!=nil { + t.Fatal(err) + } + if "AvroSourceProtocol" !=proto.Name { + t.Errorf("Proto Name not pared; Expected AvroSourceProtocol / actual %#v (%#v", proto.Name, proto) + } + + if len(proto.MD5)==0 { + t.Errorf("Proto MD5 not calculated; actual %#v ", proto) + } + + if len(proto.Types)!=2 { t.Errorf("Types not parsed; Expect 2, actual %i", len(proto.Types)) } + + if len(proto.Messages)!=2 { + t.Errorf("Message not parsed: Expected2 , actual %i", len(proto.Messages)) + } + + message, ok := proto.Messages["append"] + if !ok { + t.Errorf("Message append not found : %v", proto.Messages) + } + if len(message.Request) !=1 { + t.Errorf("Message request not equals to 1: %v", message.Request) + } + if message.Request[0].Name!="event" { + t.Errorf("Request parameter not equals event / %v", message.Request[0].Name) + } + if message.Request[0].TypeX.ref!="AvroFlumeEvent" { + t.Errorf("Request parameter type not equals AvroFlumeEvent / %v", message.Request[0].TypeX.ref) + } + typeFound, found := TYPES_CACHE[message.Request[0].TypeX.ref] + if !found { + t.Errorf("Type not found on cache %v", message.Request[0].TypeX.ref) + } + if !reflect.DeepEqual(typeFound, *message.Request[0].TypeX.ProtocolType) { + t.Errorf("Type not equals with cache %v / %v",typeFound, *message.Request[0].TypeX.ProtocolType) + } +} + + +func jsonCompact(in string) (out string) { + var json_bytes = []byte(in) + buffer := new(bytes.Buffer) + json.Compact(buffer, json_bytes) + out = buffer.String() + return +} + +func TestToJson(t *testing.T) { + protocol, err := NewProtocol() + if err!= nil { + t.Fatal("%#v", err) + } + + result, err := protocol.Json() + if err !=nil { + t.Fatal("%#v", err) + } + if result!= jsonCompact(proto) { + t.Errorf("Proto to Json not equals; Expected \n%#v\nactual \n%#v",jsonCompact(proto), result) + } +} + +func TestGetCodec(t *testing.T) { + protocol, err := NewProtocol() + if err!= nil { + t.Fatal("%#v", err) + } + + + flumeRecord, errFlume := protocol.NewRecord("AvroFlumeEvent") + if errFlume != nil { + t.Fatal(errFlume) + } + headers := make(map[string]interface{}) + headers["host_header"] = "127.0.0.1" + flumeRecord.Set("headers", headers) + flumeRecord.Set("body", []byte("test")) + bb := new(bytes.Buffer) + codec, err := protocol.MessageRequestCodec("append") + if err != nil { + t.Fatal(err) + } + codec.Encode(bb, flumeRecord) + + t.Logf("AvroFlumeEvent test: %v", bb) + +} diff --git a/requestor.go b/requestor.go new file mode 100644 index 0000000..8eaefa9 --- /dev/null +++ b/requestor.go @@ -0,0 +1,308 @@ +package goavro + +import ( + "bytes" + "fmt" + "io" + "log" + "github.com/sebglon/goavro/transceiver" +) + +var REMOTE_HASHES map[string][]byte +var REMOTE_PROTOCOLS map[string]Protocol + +var BUFFER_HEADER_LENGTH = 4 +var BUFFER_SIZE = 8192 + +var META_WRITER Codec +var META_READER Codec +var HANDSHAKE_REQUESTOR_READER Codec + +type Requestor struct { + // Base class for the client side of protocol interaction. + local_protocol Protocol + transceiver transceiver.Transceiver + remote_protocol Protocol + remote_hash []byte + send_protocol bool + send_handshake bool +} +func init() { + var err error + HANDSHAKE_REQUESTOR_READER, err = NewCodec(handshakeResponseshema) + if err!=nil { + log.Fatal(err) + } + META_WRITER, err = NewCodec(metadataSchema) + if err!=nil { + log.Fatal(err) + } + META_READER, err = NewCodec(metadataSchema) + if err!=nil { + log.Fatal(err) + } + +} + +func NewRequestor(localProto Protocol, transceiver transceiver.Transceiver) *Requestor { + + r := &Requestor{ + local_protocol: localProto, + transceiver: transceiver, +// remote_protocol: nil, +// remote_hash: nil, + send_protocol: false, + send_handshake: true, + } + transceiver.InitHandshake(r.write_handshake_request, r.read_handshake_response) + return r +} + + +func (a *Requestor) RemoteProtocol(proto Protocol) { + a.remote_protocol = proto + REMOTE_PROTOCOLS[proto.Name] = proto +} + +func (a *Requestor) RemoteHash(hash []byte) { + a.remote_hash = hash + REMOTE_HASHES[a.remote_protocol.Name] = hash +} + +func (a *Requestor) Request(message_name string, request_datum interface{}) error { + // wrtie a request message and reads a response or error message. + // build handshale and call request + frame1 := new(bytes.Buffer) + frame2 := new(bytes.Buffer) + + err := a.write_call_requestHeader(message_name, frame1) + if err!=nil { + return err + } + err = a.write_call_request(message_name, request_datum, frame2) + if err!=nil { + return err + } + + // sen the handshake and call request; block until call response + buffer_writers := []bytes.Buffer{*frame1, *frame2} + responses, err := a.transceiver.Transceive(buffer_writers) + + if err!=nil { + return fmt.Errorf("Fail to transceive %v", err) + } + //buffer_decoder := bytes.NewBuffer(decoder) + // process the handshake and call response + + if len(responses) >0 { + a.read_call_responseCode(responses[1]) + if err != nil { + return err + } + // a.Request(message_name, request_datum) + } + return nil +} + +func (a *Requestor) write_handshake_request() (handshake []byte ,err error) { + buffer := new(bytes.Buffer) + defer buffer.Write(handshake) + local_hash :=a.local_protocol.MD5 + remote_name := a.remote_protocol.Name + remote_hash := REMOTE_HASHES[remote_name] + if len(remote_hash)==0 { + remote_hash = local_hash + a.remote_protocol = a.local_protocol + } + + record, err := NewRecord(RecordSchema(handshakeRequestshema)) + if err != nil { + err = fmt.Errorf("Avro fail to init record handshakeRequest %v",err) + return + } + + record.Set("clientHash", local_hash) + record.Set("serverHash", remote_hash) + record.Set("meta", make(map[string]interface{})) + codecHandshake, err := NewCodec(handshakeRequestshema) + if err != nil { + err = fmt.Errorf("Avro fail to get codec handshakeRequest %v",err) + return + } + + if a.send_protocol { + json, err := a.local_protocol.Json() + if err!=nil { + return nil ,err + } + record.Set("clientProtocol", json) + } + + + + if err = codecHandshake.Encode(buffer, record); err !=nil { + err = fmt.Errorf("Encode handshakeRequest ",err) + return + } + + + return +} + +func (a *Requestor) write_call_request(message_name string, request_datum interface{}, frame io.Writer) (err error) { + codec, err := a.local_protocol.MessageRequestCodec(message_name) + + if err != nil { + return fmt.Errorf("fail to get codec for message %s: %v", message_name, err) + } + a.write_request(codec, request_datum, frame) + return err +} + +func (a *Requestor) write_call_requestHeader(message_name string, frame1 io.Writer) error { + // The format of a call request is: + // * request metadata, a map with values of type bytes + // * the message name, an Avro string, followed by + // * the message parameters. Parameters are serialized according to + // the message's request declaration. + + // TODO request metadata (not yet implemented) + request_metadata := make(map[string]interface{}) + // encode metadata + if err := META_WRITER.Encode(frame1, request_metadata); err != nil { + return fmt.Errorf("Encode metadata ", err) + } + + + stringCodec.Encode(frame1,message_name) + return nil +} + +func (a *Requestor) write_request(request_codec Codec, request_datum interface{}, buffer io.Writer) error { + + + if err := request_codec.Encode(buffer, request_datum); err != nil { + return fmt.Errorf("Fail to encode request_datum %v", err) + } + return nil +} + +func (a *Requestor) read_handshake_response(decoder io.Reader) (bool, error) { + if !a.send_handshake { + return true, nil + } + + datum, err := HANDSHAKE_REQUESTOR_READER.Decode(decoder) + if err != nil { + + return false,fmt.Errorf("Fail to decode %v with error %v", decoder, err) + } + + record, ok := datum.(*Record) + if !ok { + return false, fmt.Errorf("Fail to decode handshake %T", datum) + } + + var we_have_matching_schema =false + match, err := record.Get("match") + if err!= nil { + return false, err + } + switch match { + case "BOTH": + a.send_protocol = false + we_have_matching_schema =true + case "CLIENT": + err = fmt.Errorf("Handshake failure. match == CLIENT") + if a.send_protocol { + field , err := record.Get("serverProtocol") + if err!= nil { + return false, err + } + a.remote_protocol = field.(Protocol) + field, err = record.Get("serverHash") + if err!= nil { + return false, err + } + a.remote_hash = field.([]byte) + + a.send_protocol = false + we_have_matching_schema = true + } + case "NONE": + err = fmt.Errorf("Handshake failure. match == NONE") + if a.send_protocol { + field , err := record.Get("serverProtocol") + if err!= nil { + return false, err + } + a.remote_protocol = field.(Protocol) + field, err = record.Get("serverHash") + if err!= nil { + return false, err + } + a.remote_hash = field.([]byte) + + a.send_protocol = true + } + default: + err = fmt.Errorf("Unexpected match: #{match}") + } + + return we_have_matching_schema, nil +} + +func (a *Requestor) read_call_responseCode(decoder io.Reader) error { + // The format of a call response is: + // * response metadata, a map with values of type bytes + // * a one-byte error flag boolean, followed by either: + // * if the error flag is false, + // the message response, serialized per the message's response schema. + // * if the error flag is true, + // the error, serialized per the message's error union schema. + _, err := META_READER.Decode(decoder) + + if err != nil { + return fmt.Errorf("Decode metadata ", err) + } + return nil + +} + + +func (a *Requestor) read_call_responseMessage(message_name string, decoder io.Reader ) error { + codec, err := a.local_protocol.MessageResponseCodec(message_name) + + if err != nil { + return fmt.Errorf("fail to get response codec for message %s: %v", message_name, err) + } + + datum, err := codec.Decode(decoder); + if err != nil { + + return fmt.Errorf("Fail to decode %v with error %v", decoder, err) + } + status, ok := datum.(string) + if !ok { + return fmt.Errorf("Fail to decode Status response %v", datum) + } + + switch status { + case "OK": + err = nil + case "FAILED": + err = fmt.Errorf("Reponse failure. status == %v", status) + + case "UNKNOWN": + err = fmt.Errorf("Reponse failure. match == %v", status) + + default: + err = fmt.Errorf("Unexpected status: %v", status) + } + + return err + + +} + + diff --git a/requestor_test.go b/requestor_test.go new file mode 100644 index 0000000..149c33b --- /dev/null +++ b/requestor_test.go @@ -0,0 +1,239 @@ +package goavro + +import ( + "testing" + "net" + "bytes" + "reflect" + netty "github.com/sebglon/goavro/transceiver/netty" + "github.com/sebglon/goavro/transceiver" + "runtime" + "strconv" +) + +func TestWrite_handshake_request(t *testing.T) { + //t.SkipNow() + + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + if err != nil { + t.Fatal(err) + } + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + + hds, err := requestor.write_handshake_request() + // conn.Write(bb.Bytes()) + t.Logf("Handshake_request size %v %x\n",len(hds), hds) + t.Logf( "Handshake_request %v\n", hds) + + refHandshake := []byte("\x86\xaa\xda\xe2\xc4\x54\x74\xc0\xfe\x93\xff\xd0\xf2\x35\x0a\x65\x00\x86\xaa\xda\xe2\xc4\x54\x74\xc0\xfe\x93\xff\xd0\xf2\x35\x0a\x65\x02\x00") + //bytes := bb.Bytes() + //if !reflect.DeepEqual(refHandshake, bytes) { + // t.Fatalf("Handshake not equals to ref %n%x, %n%x", len(refHandshake), refHandshake, len(bytes), bytes) + //} + + + codecHandshake, err := NewCodec(handshakeRequestshema) + if err != nil { + t.Fatal(err) + } + record, err := codecHandshake.Decode(bytes.NewBuffer(refHandshake)) + if err != nil { + t.Fatal(err) + } + t.Logf("\nHandshake_request decoded %v\n", record) + +} + +func TestRead_handshake_reponse(t *testing.T) { + codecHandshake, err := NewCodec(handshakeResponseshema) + if err != nil { + t.Fatal(err) + } + record, err := NewRecord(RecordSchema(handshakeResponseshema)) + if err != nil { + t.Fatal(err) + } + record.Set("match", Enum{"match", "BOTH"}) + record.Set("serverProtocol", nil) + record.Set("serverHash", nil) + record.Set("meta", nil) + + bb := new(bytes.Buffer) + err = codecHandshake.Encode(bb, record) + if err != nil { + t.Fatal(err) + } + t.Logf("Encode HandshakeResponse %v", bb.Bytes()) + + _, err = codecHandshake.Decode(bytes.NewReader(bb.Bytes())) + if err != nil { + t.Fatal(err) + } + + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + if err != nil { + t.Fatal(err) + } + + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + _, err = requestor.read_handshake_response(bytes.NewReader(bb.Bytes())) + if err != nil { + t.Fatal(err) + } +} + +func TestWrite_call_request(t * testing.T) { + //t.SkipNow() + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + bb := new(bytes.Buffer) + datum, err := protocol.NewRecord("AvroFlumeEvent") + if err != nil { + t.Fatal(err) + } + + headers := make(map[string]interface{}) + headers["host_header"] = "127.0.0.1" + datum.Set("headers", headers) + datum.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")) + + requestor.write_call_request("append", datum, bb) + // conn.Write(bb.Bytes()) + t.Logf("\nCall_request size %v %v\n", bb.Len(), bb.Bytes()) + t.Logf("\nCall_request %v\n", bb.String()) + + codec, err := protocol.MessageRequestCodec("append") + if err != nil { + t.Fatal(err) + } + value, err := codec.Decode(bb) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(datum, value) { + t.Fatalf("Request not equals to ref %x, %x", datum, value) + } + } + +func TestWrite_call_requestHeader(t * testing.T) { + //t.SkipNow() + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + bb := new(bytes.Buffer) + + requestor.write_call_requestHeader("append", bb) + + refHeader := []byte("\x00\x0c\x61\x70\x70\x65\x6e\x64") + bytes := bb.Bytes() + if !reflect.DeepEqual(refHeader, bytes) { + t.Fatalf("Request_Header not equals to ref %n%x, %n%x", len(refHeader), refHeader, len(bytes), bytes) + } + } + +func TestRead_call_responseMessage(t * testing.T) { + + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + codec, err := protocol.MessageResponseCodec("append") + if err != nil { + t.Fatal(err) + } + bb := new(bytes.Buffer) + codec.Encode(bb, Enum{"Status", "OK"}) + t.Logf("Bytes for OK %x", bb.Bytes()) + + err = requestor.read_call_responseMessage("append", bb) + if err != nil { + t.Fatal(err) + } + + codec.Encode(bb, Enum{"Status", "FAILED"}) + t.Logf("Bytes for FAILED %x", bb.Bytes()) + err = requestor.read_call_responseMessage("append", bb) + if err == nil || err.Error() != "Reponse failure. status == FAILED" { + t.Fatalf("Status FAILED can return error") + } + + } + + +const ( + RECV_BUF_LEN = 1024 + NETWORK = "tcp" + HOST = "127.0.0.1" + PORT=6666 + ADDR="127.0.0.1:6666" +) + + +func init() { + numProcs := runtime.NumCPU() + if numProcs < 2 { + numProcs = 2 + } + runtime.GOMAXPROCS(numProcs) + + listener, err := net.Listen(NETWORK, "0.0.0.0:"+strconv.Itoa(PORT)) + if err != nil { + println("error listening:", err.Error()) + } + go func() { + for { + conn, err := listener.Accept() + if err != nil { + println("Error accept:", err.Error()) + return + } + go EchoFunc(conn) + } + }() +} + +func EchoFunc(conn net.Conn) { + for { + buf := make([]byte, RECV_BUF_LEN) + n, err := conn.Read(buf) + if err != nil { + println("Error reading:", err.Error()) + return + } + println("received ", n, " bytes of data =", string(buf)) + n, err = conn.Write(buf) + if err != nil { + println("Error writing:", err.Error()) + return + } + println("sended ", n, " bytes of data =", string(buf)) + } +} + + diff --git a/schema.go b/schema.go new file mode 100644 index 0000000..c221cd4 --- /dev/null +++ b/schema.go @@ -0,0 +1,34 @@ +package goavro + + +const handshakeRequestshema = ` +{ + "type": "record", + "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc", + "fields": [ + {"name": "clientHash", + "type": {"type": "fixed", "name": "MD5", "size": 16}}, + {"name": "clientProtocol", "type": ["null", "string"]}, + {"name": "serverHash", "type": "MD5"}, + {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]} + ] +} +` + +const handshakeResponseshema = ` +{ + "type": "record", + "name": "HandshakeResponse", "namespace": "org.apache.avro.ipc", + "fields": [ + {"name": "match", + "type": {"type": "enum", "name": "HandshakeMatch", + "symbols": ["BOTH", "CLIENT", "NONE"]}}, + {"name": "serverProtocol", + "type": ["null", "string"]}, + {"name": "serverHash", + "type": ["null", {"type": "fixed", "name": "MD5", "size": 16}]}, + {"name": "meta", + "type": ["null", {"type": "map", "values": "bytes"}]} + ] +} +` diff --git a/transceiver/Tranceiver.go b/transceiver/Tranceiver.go new file mode 100644 index 0000000..37767e0 --- /dev/null +++ b/transceiver/Tranceiver.go @@ -0,0 +1,35 @@ +package transceiver + +import ( + "bytes" + "io" + "time" +) + +type WriteHandshake func() ([]byte, error) +type ReadHandshake func(io.Reader) (bool, error) +type Transceiver interface { + Transceive(request []bytes.Buffer) ([]io.Reader, error) + InitHandshake(WriteHandshake, ReadHandshake ) + Close() + + + +} + + + +type Config struct { + Port int `json:"port"` + Host string `json:"host"` + Network string `json:"network"` + SocketPath string `json:"socket_path"` + Timeout time.Duration `json:"timeout"` + AsyncConnect bool `json:"async_connect"` + BufferLimit int `json:"buffer_limit"` + RetryWait int `json:"retry_wait"` + MaxRetry int `json:"max_retry"` + InitialCap int `json:"initial_cap"` + MaxCap int `json:"max_cap"` +} + diff --git a/transceiver/connection.go b/transceiver/connection.go new file mode 100644 index 0000000..886a2c2 --- /dev/null +++ b/transceiver/connection.go @@ -0,0 +1,46 @@ +package transceiver + +import ( + "io" + "net" + "strconv" +) + +type HandShakeConn interface { + io.ReadWriteCloser + IsChecked() bool + Checked(bool) + GetConn() (net.Conn, error) +} + +type Connection struct { + net.Conn + checked bool + bad bool +} + + +func NewConnection(config Config) (*Connection, error) { + + conn := &Connection{} + var err error + switch config.Network { + case "tcp": + conn.Conn, err = net.DialTimeout(config.Network, config.Host+":"+strconv.Itoa(config.Port), config.Timeout) + case "unix": + conn.Conn, err = net.DialTimeout(config.Network, config.SocketPath, config.Timeout) + default: + err = net.UnknownNetworkError(config.Network) + } + + return conn, err +} + +func (c *Connection) Checked(check bool) { + c.checked = check +} + +func (c *Connection) IsChecked() bool{ + return c.checked +} + diff --git a/transceiver/netty/Netty.go b/transceiver/netty/Netty.go new file mode 100644 index 0000000..663a887 --- /dev/null +++ b/transceiver/netty/Netty.go @@ -0,0 +1,131 @@ +package netty + +import ( + "bytes" + "encoding/binary" + "io" + "sync" + "github.com/sebglon/goavro/transceiver" + "fmt" + "log" +) + +type NettyTransceiver struct { + transceiver.Pool + mu sync.Mutex + pending []byte + alreadyCalled bool + writeHandShake transceiver.WriteHandshake + readHandshake transceiver.ReadHandshake +} +func NewTransceiver(config transceiver.Config) (f* NettyTransceiver, err error){ + f = &NettyTransceiver{} + pool , err := transceiver.NewPool(config) + if err !=nil { + return + } + f.Pool =*pool + return +} + +func (t *NettyTransceiver) InitHandshake(writer transceiver.WriteHandshake,reader transceiver.ReadHandshake ) { + t.writeHandShake=writer + t.readHandshake=reader +} + + + + + +func (t *NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){ + nettyFrame := new(bytes.Buffer) + t.Pack(nettyFrame, requests) + log.Printf("%#v",t.Pool) + conn, pc, err := t.Pool.Conn() + + if err!=nil { + return nil, err + } + defer pc.Close() + + if !conn.IsChecked() { + frame0 := requests[0] + if t.writeHandShake ==nil { + return nil, fmt.Errorf("InitHandshake not called before Transceive") + } + handshake, err := t.writeHandShake() + if err!=nil { + return nil, err + } + + requests[0].Reset() + _, err = requests[0].Write(append( handshake, frame0.Bytes()...)) + if err!=nil { + return nil, err + } + } + + bodyBytes, err := t.Pool.Call(conn, pc, nettyFrame.Bytes()) + if err != nil { + return nil, err + } + + + + resps, err := t.Unpack(bodyBytes) + if err != nil { + return nil, err + } + + if !conn.IsChecked() && len(resps)>1{ + ok, err := t.readHandshake(resps[0]) + if err!=nil { + return nil, err + } + conn.Checked(ok) + if !ok { + return nil, fmt.Errorf("Fail to validate Handshake") + } + return resps[1:], nil + } else { + return resps, nil + } + +} + +func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) { + // Set Netty Serial + + nettySerial :=make([]byte, 4) + binary.BigEndian.PutUint32(nettySerial, uint32(1)) + frame.Write(nettySerial) + + + nettySizeBuffer :=make([]byte, 4) + binary.BigEndian.PutUint32(nettySizeBuffer, uint32(len(requests))) + frame.Write(nettySizeBuffer) + + for _, request := range requests { + requestSize :=make([]byte, 4) + binary.BigEndian.PutUint32(requestSize, uint32(request.Len())) + frame.Write(requestSize) + frame.Write(request.Bytes()) + } +} + +func (t *NettyTransceiver) Unpack(frame []byte) ([]io.Reader, error) { + nettyNumberFame := binary.BigEndian.Uint32(frame[4:8]) + result := make([]io.Reader, nettyNumberFame) + startFrame := uint32(8) + i:=uint32(0) + for i < nettyNumberFame { + messageSize := uint32(binary.BigEndian.Uint32(frame[startFrame:startFrame+4])) + message := frame[startFrame+4:startFrame+4+messageSize] + startFrame = startFrame+4+messageSize + br := bytes.NewReader(message) + result[i] = br + i++ + } + + return result, nil +} diff --git a/transceiver/netty/Netty_test.go b/transceiver/netty/Netty_test.go new file mode 100644 index 0000000..2099595 --- /dev/null +++ b/transceiver/netty/Netty_test.go @@ -0,0 +1,132 @@ +package netty + +import ( + "testing" + + "bytes" + "reflect" + "io/ioutil" + "runtime" + "net" + "github.com/sebglon/goavro/transceiver" + "strconv" + "io" +) + +const ( + RECV_BUF_LEN = 1024 + NETWORK = "tcp" + HOST = "127.0.0.1" + PORT=6666 + ADDR="127.0.0.1:6666" +) + + +func init() { + numProcs := runtime.NumCPU() + if numProcs < 2 { + numProcs = 2 + } + runtime.GOMAXPROCS(numProcs) + + listener, err := net.Listen(NETWORK, "0.0.0.0:"+strconv.Itoa(PORT)) + if err != nil { + println("error listening:", err.Error()) + } + go func() { + for { + conn, err := listener.Accept() + if err != nil { + println("Error accept:", err.Error()) + return + } + go EchoFunc(conn) + } + }() +} + +func EchoFunc(conn net.Conn) { + for { + buf := make([]byte, RECV_BUF_LEN) + n, err := conn.Read(buf) + if err != nil { + println("Error reading:", err.Error()) + return + } + println("received ", n, " bytes of data =", string(buf)) + n, err = conn.Write(buf) + if err != nil { + println("Error writing:", err.Error()) + return + } + println("sended ", n, " bytes of data =", string(buf)) + } +} + +func TestTransceive(t *testing.T) { + f, err := NewTransceiver(transceiver.Config{Network:NETWORK, Host:HOST, Port:PORT}) + if err != nil { + t.Fatal(err) + } + defer f.Close() + f.InitHandshake(func()([]byte, error){return make([]byte,1), nil},func(io.Reader)(bool, error){return true, nil}) + + + msg := "This is test writing." + bmsg := make([]bytes.Buffer, 1) + bmsg[0] = *bytes.NewBuffer([]byte(msg)) + + resp, err := f.Transceive(bmsg) + if err != nil { + t.Fatal(err.Error()) + } + brcv := make([]byte, len([]byte(msg))) + resp[0].Read(brcv) + rcv := string(brcv) + if rcv != msg { + t.Errorf("got %s, except %s", rcv, msg) + } + +} +func TestPack(t *testing.T) { + transceiver := new(NettyTransceiver) + frame := new(bytes.Buffer) + transceiver.Pack(frame,*new([]bytes.Buffer)) + if frame.Len()!=8 { + t.Fatalf("Frame not equals to 8: %x", frame.Len()) + } + reflect.DeepEqual(frame.Bytes(), []byte("\x00\x00\x00\x01\x00\x00\x00\x00")) + frame.Reset() + + requests:= make([]bytes.Buffer, 2) + requests[0] = *bytes.NewBuffer([]byte("buf1")) + requests[1] = *bytes.NewBuffer([]byte("buf2xxxx")) + transceiver.Pack(frame, requests) + expectedSize:= 8+ requests[0].Len()+4 + requests[1].Len() + 4 + if frame.Len()!= expectedSize { + t.Fatalf("Frame not equals to %x: %x / %x",expectedSize, frame.Len(), frame.Bytes()) + } +} + +func TestUnpack(t *testing.T) { + transceiver := new(NettyTransceiver) + frame := []byte("\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x01\x0a\x00\x00\x00\x01\x0b") + respons, err := transceiver.Unpack(frame) + if err != nil { + t.Fatalf("%v",err) + } + + if len(respons)!=2 { + t.Fatalf("Number of reponse frame not equals %x / %x",2, len(respons)) + } + + var resp1 []byte + var resp2 []byte + resp1, _ =ioutil.ReadAll(respons[0]) + respons[1].Read(resp2) + if !reflect.DeepEqual(resp1, []byte("\x0a")) && !reflect.DeepEqual(resp2, []byte("\x0b")) { + t.Fatalf("Reponse message not equals (0) %x/%x; (1) %x/%x","\x0a", respons[0], "\x0b", respons[1]) + } + +} + diff --git a/transceiver/pool.go b/transceiver/pool.go new file mode 100644 index 0000000..0270955 --- /dev/null +++ b/transceiver/pool.go @@ -0,0 +1,141 @@ +package transceiver + +import ( + "gopkg.in/fatih/pool.v2" + "net" + "fmt" + "sync" + "errors" + "log" + "time" +) + + +var ( + errPoolClosed = errors.New("Avro transceiver: Pool Closed") +) +type Pool struct { + Config + pool pool.Pool + mu sync.RWMutex + closed bool +} + +const ( + defaultHost = "127.0.0.1" + defaultNetwork = "tcp" + defaultSocketPath = "" + defaultPort = 63001 + defaultTimeout = 3 * time.Second + defaultBufferLimit = 8 * 1024 * 1024 + defaultRetryWait = 500 + defaultMaxRetry = 13 + defaultInitialCap = 2 + defaultMaxCap = 5 + defaultReconnectWaitIncreRate = 1.5 +) + +func NewPool(config Config) (*Pool, error) { + if config.Network == "" { + config.Network = defaultNetwork + } + if config.Host == "" { + config.Host = defaultHost + } + if config.Port == 0 { + config.Port = defaultPort + } + if config.SocketPath == "" { + config.SocketPath = defaultSocketPath + } + if config.Timeout == 0 { + config.Timeout = defaultTimeout + } + if config.BufferLimit == 0 { + config.BufferLimit = defaultBufferLimit + } + if config.RetryWait == 0 { + config.RetryWait = defaultRetryWait + } + if config.MaxRetry == 0 { + config.MaxRetry = defaultMaxRetry + } + if config.InitialCap == 0 { + config.InitialCap = defaultInitialCap + } + if config.MaxCap == 0 { + config.MaxCap = defaultMaxCap + } + p, err := pool.NewChannelPool(config.InitialCap,config.MaxCap, func() (net.Conn, error) { + conn, err := NewConnection(config) + if err != nil { + return nil, fmt.Errorf("\nFail to init connec, %#v \n%v",config,err) + } + return conn, err + }) + if err != nil { + return nil, err + } + + pool := &Pool{ + pool: p, + Config: config, + } + log.Printf("%#v",pool.pool) + return pool, nil + +} + +func (p *Pool) Conn() (*Connection, *pool.PoolConn, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.closed { + return nil, nil, errPoolClosed + } + + + nc, err := p.pool.Get() + if err != nil { + return nil, nil, err + } + + log.Printf(" %T %#v", nc,nc) + + pc, ok := nc.(*pool.PoolConn) + if !ok { + // This should never happen! + return nil, nil, fmt.Errorf("Invalid connection in pool") + } + + conn, ok := pc.Conn.(*Connection) + if !ok { + // This should never happen! + return nil, nil, fmt.Errorf("Invalid connection in pool") + } + + return conn, pc, nil +} + +func (p *Pool) Call(conn *Connection, pc *pool.PoolConn, req []byte) (resp []byte, err error) { + if err != nil { + return + } + defer pc.Close() + + _,err= conn.Write(req) + + if err != nil { + return nil, err + } + resp = make([]byte, 1024) + _,err = conn.Read(resp) + + if err != nil { + return nil, err + } + return +} +func (t *Pool) Close() { + t.pool.Close() +} \ No newline at end of file